[ 
https://issues.apache.org/jira/browse/FLINK-1514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14323032#comment-14323032
 ] 

ASF GitHub Bot commented on FLINK-1514:
---------------------------------------

Github user balidani commented on a diff in the pull request:

    https://github.com/apache/flink/pull/408#discussion_r24763748
  
    --- Diff: 
flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
 ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.test;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.graph.gsa.ApplyFunction;
    +import org.apache.flink.graph.gsa.GatherFunction;
    +import org.apache.flink.graph.gsa.SumFunction;
    +import org.apache.flink.test.util.MultipleProgramsTestBase;
    +import org.apache.flink.util.Collector;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +
    +@RunWith(Parameterized.class)
    +public class GatherSumApplyITCase extends MultipleProgramsTestBase {
    +
    +   public GatherSumApplyITCase(MultipleProgramsTestBase.ExecutionMode 
mode){
    +           super(mode);
    +   }
    +
    +   private String resultPath;
    +   private String expectedResult;
    +
    +   @Rule
    +   public TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +   @Before
    +   public void before() throws Exception{
    +           resultPath = tempFolder.newFile().toURI().toString();
    +   }
    +
    +   @After
    +   public void after() throws Exception{
    +           compareResultsByLinesInMemory(expectedResult, resultPath);
    +   }
    +
    +   @Test
    +   public void testGreedyGraphColoring() throws Exception {
    +           /*
    +            * This test runs the Greedy Graph Coloring algorithm using a 
GSA iteration
    +            * The expected result is that the lowest vertex value (1) gets 
propagated to all vertices
    +            */
    +           final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
    +
    +           Graph<Long, Long, Long> graph = 
Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
    +                           TestGraphUtils.getLongLongEdgeData(env), env);
    +
    +           // Gather the target vertices into a one-element set
    +           GatherFunction<Long, Long, Long, HashSet<Vertex<Long,Long>>> 
gather =
    +                           new GatherFunction<Long, Long, Long, 
HashSet<Vertex<Long,Long>>>() {
    +
    +                                   @Override
    +                                   public Tuple2<Long, 
HashSet<Vertex<Long, Long>>> gather(Tuple3<Vertex<Long, Long>,
    +                                                   Edge<Long, Long>, 
Vertex<Long, Long>> triplet) {
    +
    +                                           HashSet<Vertex<Long, Long>> 
result = new HashSet<Vertex<Long, Long>>();
    +                                           result.add(triplet.f2);
    +
    +                                           return new Tuple2<Long, 
HashSet<Vertex<Long, Long>>>(triplet.f0.getId(), result);
    +                                   }
    +                           };
    +
    +           // Merge the sets between neighbors
    +           SumFunction<Long, Long, Long, HashSet<Vertex<Long, Long>>> sum =
    +                           new SumFunction<Long, Long, Long, 
HashSet<Vertex<Long, Long>>>() {
    +
    +                                   @Override
    +                                   public Tuple2<Long, 
HashSet<Vertex<Long, Long>>> sum(
    +                                                   Tuple2<Long, 
HashSet<Vertex<Long, Long>>> arg0,
    +                                                   Tuple2<Long, 
HashSet<Vertex<Long, Long>>> arg1) {
    +
    +                                           HashSet<Vertex<Long, Long>> 
result = new HashSet<Vertex<Long, Long>>();
    +                                           result.addAll(arg0.f1);
    +                                           result.addAll(arg1.f1);
    +
    +                                           return new Tuple2<Long, 
HashSet<Vertex<Long, Long>>>(arg0.f0, result);
    +                                   }
    +                           };
    +
    +           // Find the minimum vertex id in the set which will be 
propagated
    +           ApplyFunction<Long, Long, Long, HashSet<Vertex<Long, Long>>> 
apply =
    +                           new ApplyFunction<Long, Long, Long, 
HashSet<Vertex<Long, Long>>>() {
    +
    +                                   @Override
    +                                   public void apply(Tuple2<Long, 
HashSet<Vertex<Long, Long>>> set,
    +                                                   Vertex<Long, Long> src, 
Collector<Vertex<Long, Long>> collector) {
    +                                           long minValue = src.getValue();
    +                                           for (Vertex<Long, Long> v : 
set.f1) {
    +                                                   if (v.getValue() < 
minValue) {
    +                                                           minValue = 
v.getValue();
    +                                                   }
    +                                           }
    +
    +                                           // This is the condition that 
enables the termination of the iteration
    +                                           if (minValue != src.getValue()) 
{
    +                                                   collector.collect(new 
Vertex<Long, Long>(src.getId(), minValue));
    +                                           }
    +                                   }
    +                           };
    +
    +           Graph<Long, Long, Long> minColoring = 
graph.runGatherSumApplyIteration(gather, sum, apply, 16);
    +           minColoring.getVertices().writeAsCsv(resultPath);
    +
    +           env.execute();
    +
    +           expectedResult = "1,1\n" +
    +                           "2,1\n" +
    +                           "3,1\n" +
    +                           "4,1\n" +
    +                           "5,1\n";
    +   }
    +
    +   @Test
    +   public void testSingleSourceShortestPath() throws Exception {
    +           /*
    +            * This test runs the Single Source Shortest Path algorithm 
using a GSA iteration
    +            */
    +           final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
    +
    +           // Start off with 0 for the source vertex and INF for all other 
vertices
    +           List<Vertex<Long, Double>> vertices = new 
ArrayList<Vertex<Long, Double>>();
    +           vertices.add(new Vertex<Long, Double>(1L, 0.0));
    +           vertices.add(new Vertex<Long, Double>(2L, 
Double.POSITIVE_INFINITY));
    +           vertices.add(new Vertex<Long, Double>(3L, 
Double.POSITIVE_INFINITY));
    +           vertices.add(new Vertex<Long, Double>(4L, 
Double.POSITIVE_INFINITY));
    +           vertices.add(new Vertex<Long, Double>(5L, 
Double.POSITIVE_INFINITY));
    +
    +           Graph<Long, Double, Long> graph = 
Graph.fromDataSet(env.fromCollection(vertices),
    +                           TestGraphUtils.getLongLongEdgeData(env), env);
    +
    +           // The path from src to trg through edge e costs src + e
    +           // If the target's distance is 0 (the target is the actual 
source), return 0 instead
    +           GatherFunction<Long, Double, Long, Double> gather =
    +                           new GatherFunction<Long, Double, Long, 
Double>() {
    +                                   @Override
    +                                   public Tuple2<Long, Double> 
gather(Tuple3<Vertex<Long, Double>,
    +                                                   Edge<Long, Long>, 
Vertex<Long, Double>> triplet) {
    +
    +                                           if (triplet.f2.getValue() == 
0.0) {
    --- End diff --
    
    Normally this is not okay, but we never change the initial 0.0 value.
    Should we still make an epsilon check here, even though it would result in 
(a bit) less readable code?


> [Gelly] Add a Gather-Sum-Apply iteration method
> -----------------------------------------------
>
>                 Key: FLINK-1514
>                 URL: https://issues.apache.org/jira/browse/FLINK-1514
>             Project: Flink
>          Issue Type: New Feature
>          Components: Gelly
>    Affects Versions: 0.9
>            Reporter: Vasia Kalavri
>            Assignee: Daniel Bali
>
> This will be a method that implements the GAS computation model, but without 
> the "scatter" step. The phases can be mapped into the following steps inside 
> a delta iteration:
> gather: a map on each < srcVertex, edge, trgVertex > that produces a partial 
> value
> sum: a reduce that combines the partial values
> apply: join with vertex set to update the vertex values using the results of 
> sum and the previous state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to