[FLINK-1201] [gelly] changed pageRank example to use joinWithEdgesOnSource

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/753c71ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/753c71ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/753c71ae

Branch: refs/heads/master
Commit: 753c71ae4a40d1414c65319e783150de9cd4177e
Parents: e0c10ec
Author: vasia <vasilikikala...@gmail.com>
Authored: Sat Jan 10 18:32:01 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 10:46:14 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 90 +++++++++-----------
 .../flink/graph/example/PageRankExample.java    | 18 +++-
 .../flink/graph/test/TestJoinWithEdges.java     |  3 +-
 3 files changed, 58 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/753c71ae/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 71a701b..51b8c30 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -88,7 +88,6 @@ public class Graph<K extends Comparable<K> & Serializable, VV 
extends Serializab
         * @return
         */
        public DataSet<Boolean> validate(GraphValidator<K, VV, EV> validator) {
-
                return validator.validate(this);
        }
 
@@ -184,39 +183,39 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
         * @return - a new graph where the vertex values have been updated.
         */
        public <T> Graph<K, VV, EV> joinWithVertices(DataSet<Tuple2<K, T>> 
inputDataSet,
-                                                                               
                 final MapFunction<Tuple2<VV, T>, VV> mapper) {
+                       final MapFunction<Tuple2<VV, T>, VV> mapper) {
 
                DataSet<Vertex<K, VV>> resultedVertices = this.getVertices()
                                .coGroup(inputDataSet).where(0).equalTo(0)
                                .with(new ApplyCoGroupToVertexValues<K, VV, 
T>(mapper));
-
                return Graph.create(resultedVertices, this.getEdges(), 
this.getContext());
        }
 
        private static final class ApplyCoGroupToVertexValues<K extends 
Comparable<K> & Serializable,
-                       VV extends Serializable, T>
-                       implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, 
Vertex<K, VV>> {
+                       VV extends Serializable, T>     implements 
CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, Vertex<K, VV>> {
 
                private MapFunction<Tuple2<VV, T>, VV> mapper;
-
                public ApplyCoGroupToVertexValues(MapFunction<Tuple2<VV, T>, 
VV> mapper) {
                        this.mapper = mapper;
                }
 
                @Override
-               public void coGroup(Iterable<Vertex<K, VV>> iterableDS1, 
Iterable<Tuple2<K, T>> iterableDS2,
+               public void coGroup(Iterable<Vertex<K, VV>> vertices, 
Iterable<Tuple2<K, T>> input,
                                                        Collector<Vertex<K, 
VV>> collector) throws Exception {
 
-                       Iterator<Vertex<K, VV>> iteratorDS1 = 
iterableDS1.iterator();
-                       Iterator<Tuple2<K, T>> iteratorDS2 = 
iterableDS2.iterator();
+                       final Iterator<Vertex<K, VV>> vertexIterator = 
vertices.iterator();
+                       final Iterator<Tuple2<K, T>> inputIterator = 
input.iterator();
 
-                       if(iteratorDS2.hasNext() && iteratorDS1.hasNext()) {
-                               Tuple2<K, T> iteratorDS2Next = 
iteratorDS2.next();
+                       if (vertexIterator.hasNext()) {
+                               if(inputIterator.hasNext()) {
+                                       final Tuple2<K, T> inputNext = 
inputIterator.next();
 
-                               collector.collect(new Vertex<K, 
VV>(iteratorDS2Next.f0, mapper
-                                               .map(new Tuple2<VV, 
T>(iteratorDS1.next().f1, iteratorDS2Next.f1))));
-                       } else if(iteratorDS1.hasNext()) {
-                               collector.collect(iteratorDS1.next());
+                                       collector.collect(new Vertex<K, 
VV>(inputNext.f0, mapper
+                                                       .map(new Tuple2<VV, 
T>(vertexIterator.next().f1, inputNext.f1))));
+                               } else {
+                                       
collector.collect(vertexIterator.next());
+                               }
+                               
                        }
                }
        }
@@ -229,13 +228,12 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
         * @param <T>
         * @return - a new graph where the edge values have been updated.
         */
-       public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> 
inputDataSet,
-                                                                               
          final MapFunction<Tuple2<EV, T>, EV> mapper) {
+       public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> 
inputDataSet, 
+                       final MapFunction<Tuple2<EV, T>, EV> mapper) {
 
                DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
                                .coGroup(inputDataSet).where(0,1).equalTo(0,1)
                                .with(new ApplyCoGroupToEdgeValues<K, EV, 
T>(mapper));
-
                return Graph.create(this.getVertices(), resultedEdges, 
this.getContext());
        }
 
@@ -244,27 +242,27 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                        implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, 
T>, Edge<K, EV>> {
 
                private MapFunction<Tuple2<EV, T>, EV> mapper;
-
                public ApplyCoGroupToEdgeValues(MapFunction<Tuple2<EV, T>, EV> 
mapper) {
                        this.mapper = mapper;
                }
 
                @Override
-               public void coGroup(Iterable<Edge<K, EV>> iterableDS1,
-                                                       Iterable<Tuple3<K, K, 
T>> iterableDS2,
+               public void coGroup(Iterable<Edge<K, EV>> edges,
+                                                       Iterable<Tuple3<K, K, 
T>> input,
                                                        Collector<Edge<K, EV>> 
collector) throws Exception {
 
-                       Iterator<Edge<K, EV>> iteratorDS1 = 
iterableDS1.iterator();
-                       Iterator<Tuple3<K, K, T>> iteratorDS2 = 
iterableDS2.iterator();
+                       final Iterator<Edge<K, EV>> edgesIterator = 
edges.iterator();
+                       final Iterator<Tuple3<K, K, T>> inputIterator = 
input.iterator();
 
-                       if(iteratorDS2.hasNext() && iteratorDS1.hasNext()) {
-                               Tuple3<K, K, T> iteratorDS2Next = 
iteratorDS2.next();
+                       if (edgesIterator.hasNext()) {
+                               if(inputIterator.hasNext()) {
+                                       final Tuple3<K, K, T> inputNext = 
inputIterator.next();
 
-                               collector.collect(new Edge<K, 
EV>(iteratorDS2Next.f0, iteratorDS2Next.f1, mapper
-                                               .map(new Tuple2<EV, 
T>(iteratorDS1.next().f2, iteratorDS2Next.f2))));
-
-                       } else if(iteratorDS1.hasNext()) {
-                               collector.collect(iteratorDS1.next());
+                                       collector.collect(new Edge<K, 
EV>(inputNext.f0, inputNext.f1, mapper
+                                                       .map(new Tuple2<EV, 
T>(edgesIterator.next().f2, inputNext.f2))));
+                               } else {
+                                       collector.collect(edgesIterator.next());
+                               }
                        }
                }
        }
@@ -279,7 +277,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
         * @return - a new graph where the edge values have been updated.
         */
        public <T> Graph<K, VV, EV> joinWithEdgesOnSource(DataSet<Tuple2<K, T>> 
inputDataSet,
-                                                                               
                 final MapFunction<Tuple2<EV, T>, EV> mapper) {
+                       final MapFunction<Tuple2<EV, T>, EV> mapper) {
 
                DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
                                .coGroup(inputDataSet).where(0).equalTo(0)
@@ -289,37 +287,33 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        private static final class 
ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K extends Comparable<K> & 
Serializable,
-                       EV extends Serializable, T>
-                       implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, 
Edge<K, EV>> {
+                       EV extends Serializable, T>     implements 
CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> {
 
                private MapFunction<Tuple2<EV, T>, EV> mapper;
-
                public 
ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(MapFunction<Tuple2<EV, T>, EV> 
mapper) {
                        this.mapper = mapper;
                }
 
-
                @Override
-               public void coGroup(Iterable<Edge<K, EV>> iterableDS1,
-                                                       Iterable<Tuple2<K, T>> 
iterableDS2,
+               public void coGroup(Iterable<Edge<K, EV>> edges, 
Iterable<Tuple2<K, T>> input,
                                                        Collector<Edge<K, EV>> 
collector) throws Exception {
 
-                       Iterator<Edge<K, EV>> iteratorDS1 = 
iterableDS1.iterator();
-                       Iterator<Tuple2<K, T>> iteratorDS2 = 
iterableDS2.iterator();
+                       final Iterator<Edge<K, EV>> edgesIterator = 
edges.iterator();
+                       final Iterator<Tuple2<K, T>> inputIterator = 
input.iterator();
 
-                       if(iteratorDS2.hasNext()) {
-                               Tuple2<K, T> iteratorDS2Next = 
iteratorDS2.next();
+                       if(inputIterator.hasNext()) {
+                               final Tuple2<K, T> inputNext = 
inputIterator.next();
 
-                               while(iteratorDS1.hasNext()) {
-                                       Edge<K, EV> iteratorDS1Next = 
iteratorDS1.next();
+                               while(edgesIterator.hasNext()) {
+                                       Edge<K, EV> edgesNext = 
edgesIterator.next();
 
-                                       collector.collect(new Edge<K, 
EV>(iteratorDS1Next.f0, iteratorDS1Next.f1, mapper
-                                                       .map(new Tuple2<EV, 
T>(iteratorDS1Next.f2, iteratorDS2Next.f1))));
+                                       collector.collect(new Edge<K, 
EV>(edgesNext.f0, edgesNext.f1, mapper
+                                                       .map(new Tuple2<EV, 
T>(edgesNext.f2, inputNext.f1))));
                                }
 
                        } else {
-                               while(iteratorDS1.hasNext()) {
-                                       collector.collect(iteratorDS1.next());
+                               while(edgesIterator.hasNext()) {
+                                       collector.collect(edgesIterator.next());
                                }
                        }
                }
@@ -335,7 +329,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
         * @return - a new graph where the edge values have been updated.
         */
        public <T> Graph<K, VV, EV> joinWithEdgesOnTarget(DataSet<Tuple2<K, T>> 
inputDataSet,
-                                                                               
                          final MapFunction<Tuple2<EV, T>, EV> mapper) {
+                       final MapFunction<Tuple2<EV, T>, EV> mapper) {
 
                DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
                                .coGroup(inputDataSet).where(1).equalTo(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/753c71ae/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
index 0fc8084..e3f815a 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
@@ -8,11 +8,13 @@ import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.*;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
 public class PageRankExample implements ProgramDescription {
 
-    public static void main (String [] args) throws Exception {
+    @SuppressWarnings("serial")
+       public static void main (String [] args) throws Exception {
 
         ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
@@ -21,9 +23,19 @@ public class PageRankExample implements ProgramDescription {
         DataSet<Edge<Long,Double>> links = getLinksDataSet(env);
 
         Graph<Long, Double, Double> network = new Graph<Long, Double, 
Double>(pages, links, env);
+        
+        DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
+        
+        // assign the transition probabilities as the edge weights
+        Graph<Long, Double, Double> networkWithWeights = 
network.joinWithEdgesOnSource(vertexOutDegrees, 
+                       new MapFunction<Tuple2<Double, Long>, Double>() {
+                                       public Double map(Tuple2<Double, Long> 
value) {
+                                               return value.f0 / value.f1;
+                                       }
+                               });
 
         DataSet<Vertex<Long,Double>> pageRanks =
-                network.run(new PageRank<Long>(numPages, DAMPENING_FACTOR, 
maxIterations)).getVertices();
+                       networkWithWeights.run(new PageRank<Long>(numPages, 
DAMPENING_FACTOR, maxIterations)).getVertices();
 
         pageRanks.print();
 
@@ -60,7 +72,7 @@ public class PageRankExample implements ProgramDescription {
                             int numOutEdges = (int) (Math.random() * (numPages 
/ 2));
                             for (int i = 0; i < numOutEdges; i++) {
                                 long target = (long) (Math.random() * 
numPages) + 1;
-                                out.collect(new Edge<Long, Double>(key, 
target, 1.0 / numOutEdges));
+                                out.collect(new Edge<Long, Double>(key, 
target, 1.0));
                             }
                         }
                     });

http://git-wip-us.apache.org/repos/asf/flink/blob/753c71ae/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
index 711cd61..7375d0c 100644
--- 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
@@ -580,5 +580,4 @@ public class TestJoinWithEdges extends JavaProgramTestBase {
             }
         }
     }
-}
-
+}
\ No newline at end of file

Reply via email to