[FLINK-1201] [gelly] changed pageRank example to use joinWithEdgesOnSource
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/753c71ae Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/753c71ae Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/753c71ae Branch: refs/heads/master Commit: 753c71ae4a40d1414c65319e783150de9cd4177e Parents: e0c10ec Author: vasia <vasilikikala...@gmail.com> Authored: Sat Jan 10 18:32:01 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:14 2015 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Graph.java | 90 +++++++++----------- .../flink/graph/example/PageRankExample.java | 18 +++- .../flink/graph/test/TestJoinWithEdges.java | 3 +- 3 files changed, 58 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/753c71ae/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 71a701b..51b8c30 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -88,7 +88,6 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * @return */ public DataSet<Boolean> validate(GraphValidator<K, VV, EV> validator) { - return validator.validate(this); } @@ -184,39 +183,39 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * @return - a new graph where the vertex values have been updated. */ public <T> Graph<K, VV, EV> joinWithVertices(DataSet<Tuple2<K, T>> inputDataSet, - final MapFunction<Tuple2<VV, T>, VV> mapper) { + final MapFunction<Tuple2<VV, T>, VV> mapper) { DataSet<Vertex<K, VV>> resultedVertices = this.getVertices() .coGroup(inputDataSet).where(0).equalTo(0) .with(new ApplyCoGroupToVertexValues<K, VV, T>(mapper)); - return Graph.create(resultedVertices, this.getEdges(), this.getContext()); } private static final class ApplyCoGroupToVertexValues<K extends Comparable<K> & Serializable, - VV extends Serializable, T> - implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, Vertex<K, VV>> { + VV extends Serializable, T> implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, Vertex<K, VV>> { private MapFunction<Tuple2<VV, T>, VV> mapper; - public ApplyCoGroupToVertexValues(MapFunction<Tuple2<VV, T>, VV> mapper) { this.mapper = mapper; } @Override - public void coGroup(Iterable<Vertex<K, VV>> iterableDS1, Iterable<Tuple2<K, T>> iterableDS2, + public void coGroup(Iterable<Vertex<K, VV>> vertices, Iterable<Tuple2<K, T>> input, Collector<Vertex<K, VV>> collector) throws Exception { - Iterator<Vertex<K, VV>> iteratorDS1 = iterableDS1.iterator(); - Iterator<Tuple2<K, T>> iteratorDS2 = iterableDS2.iterator(); + final Iterator<Vertex<K, VV>> vertexIterator = vertices.iterator(); + final Iterator<Tuple2<K, T>> inputIterator = input.iterator(); - if(iteratorDS2.hasNext() && iteratorDS1.hasNext()) { - Tuple2<K, T> iteratorDS2Next = iteratorDS2.next(); + if (vertexIterator.hasNext()) { + if(inputIterator.hasNext()) { + final Tuple2<K, T> inputNext = inputIterator.next(); - collector.collect(new Vertex<K, VV>(iteratorDS2Next.f0, mapper - .map(new Tuple2<VV, T>(iteratorDS1.next().f1, iteratorDS2Next.f1)))); - } else if(iteratorDS1.hasNext()) { - collector.collect(iteratorDS1.next()); + collector.collect(new Vertex<K, VV>(inputNext.f0, mapper + .map(new Tuple2<VV, T>(vertexIterator.next().f1, inputNext.f1)))); + } else { + collector.collect(vertexIterator.next()); + } + } } } @@ -229,13 +228,12 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * @param <T> * @return - a new graph where the edge values have been updated. */ - public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> inputDataSet, - final MapFunction<Tuple2<EV, T>, EV> mapper) { + public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> inputDataSet, + final MapFunction<Tuple2<EV, T>, EV> mapper) { DataSet<Edge<K, EV>> resultedEdges = this.getEdges() .coGroup(inputDataSet).where(0,1).equalTo(0,1) .with(new ApplyCoGroupToEdgeValues<K, EV, T>(mapper)); - return Graph.create(this.getVertices(), resultedEdges, this.getContext()); } @@ -244,27 +242,27 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> { private MapFunction<Tuple2<EV, T>, EV> mapper; - public ApplyCoGroupToEdgeValues(MapFunction<Tuple2<EV, T>, EV> mapper) { this.mapper = mapper; } @Override - public void coGroup(Iterable<Edge<K, EV>> iterableDS1, - Iterable<Tuple3<K, K, T>> iterableDS2, + public void coGroup(Iterable<Edge<K, EV>> edges, + Iterable<Tuple3<K, K, T>> input, Collector<Edge<K, EV>> collector) throws Exception { - Iterator<Edge<K, EV>> iteratorDS1 = iterableDS1.iterator(); - Iterator<Tuple3<K, K, T>> iteratorDS2 = iterableDS2.iterator(); + final Iterator<Edge<K, EV>> edgesIterator = edges.iterator(); + final Iterator<Tuple3<K, K, T>> inputIterator = input.iterator(); - if(iteratorDS2.hasNext() && iteratorDS1.hasNext()) { - Tuple3<K, K, T> iteratorDS2Next = iteratorDS2.next(); + if (edgesIterator.hasNext()) { + if(inputIterator.hasNext()) { + final Tuple3<K, K, T> inputNext = inputIterator.next(); - collector.collect(new Edge<K, EV>(iteratorDS2Next.f0, iteratorDS2Next.f1, mapper - .map(new Tuple2<EV, T>(iteratorDS1.next().f2, iteratorDS2Next.f2)))); - - } else if(iteratorDS1.hasNext()) { - collector.collect(iteratorDS1.next()); + collector.collect(new Edge<K, EV>(inputNext.f0, inputNext.f1, mapper + .map(new Tuple2<EV, T>(edgesIterator.next().f2, inputNext.f2)))); + } else { + collector.collect(edgesIterator.next()); + } } } } @@ -279,7 +277,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * @return - a new graph where the edge values have been updated. */ public <T> Graph<K, VV, EV> joinWithEdgesOnSource(DataSet<Tuple2<K, T>> inputDataSet, - final MapFunction<Tuple2<EV, T>, EV> mapper) { + final MapFunction<Tuple2<EV, T>, EV> mapper) { DataSet<Edge<K, EV>> resultedEdges = this.getEdges() .coGroup(inputDataSet).where(0).equalTo(0) @@ -289,37 +287,33 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K extends Comparable<K> & Serializable, - EV extends Serializable, T> - implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> { + EV extends Serializable, T> implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> { private MapFunction<Tuple2<EV, T>, EV> mapper; - public ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(MapFunction<Tuple2<EV, T>, EV> mapper) { this.mapper = mapper; } - @Override - public void coGroup(Iterable<Edge<K, EV>> iterableDS1, - Iterable<Tuple2<K, T>> iterableDS2, + public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Tuple2<K, T>> input, Collector<Edge<K, EV>> collector) throws Exception { - Iterator<Edge<K, EV>> iteratorDS1 = iterableDS1.iterator(); - Iterator<Tuple2<K, T>> iteratorDS2 = iterableDS2.iterator(); + final Iterator<Edge<K, EV>> edgesIterator = edges.iterator(); + final Iterator<Tuple2<K, T>> inputIterator = input.iterator(); - if(iteratorDS2.hasNext()) { - Tuple2<K, T> iteratorDS2Next = iteratorDS2.next(); + if(inputIterator.hasNext()) { + final Tuple2<K, T> inputNext = inputIterator.next(); - while(iteratorDS1.hasNext()) { - Edge<K, EV> iteratorDS1Next = iteratorDS1.next(); + while(edgesIterator.hasNext()) { + Edge<K, EV> edgesNext = edgesIterator.next(); - collector.collect(new Edge<K, EV>(iteratorDS1Next.f0, iteratorDS1Next.f1, mapper - .map(new Tuple2<EV, T>(iteratorDS1Next.f2, iteratorDS2Next.f1)))); + collector.collect(new Edge<K, EV>(edgesNext.f0, edgesNext.f1, mapper + .map(new Tuple2<EV, T>(edgesNext.f2, inputNext.f1)))); } } else { - while(iteratorDS1.hasNext()) { - collector.collect(iteratorDS1.next()); + while(edgesIterator.hasNext()) { + collector.collect(edgesIterator.next()); } } } @@ -335,7 +329,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * @return - a new graph where the edge values have been updated. */ public <T> Graph<K, VV, EV> joinWithEdgesOnTarget(DataSet<Tuple2<K, T>> inputDataSet, - final MapFunction<Tuple2<EV, T>, EV> mapper) { + final MapFunction<Tuple2<EV, T>, EV> mapper) { DataSet<Edge<K, EV>> resultedEdges = this.getEdges() .coGroup(inputDataSet).where(1).equalTo(0) http://git-wip-us.apache.org/repos/asf/flink/blob/753c71ae/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java index 0fc8084..e3f815a 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java @@ -8,11 +8,13 @@ import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.api.common.functions.*; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class PageRankExample implements ProgramDescription { - public static void main (String [] args) throws Exception { + @SuppressWarnings("serial") + public static void main (String [] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -21,9 +23,19 @@ public class PageRankExample implements ProgramDescription { DataSet<Edge<Long,Double>> links = getLinksDataSet(env); Graph<Long, Double, Double> network = new Graph<Long, Double, Double>(pages, links, env); + + DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees(); + + // assign the transition probabilities as the edge weights + Graph<Long, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, + new MapFunction<Tuple2<Double, Long>, Double>() { + public Double map(Tuple2<Double, Long> value) { + return value.f0 / value.f1; + } + }); DataSet<Vertex<Long,Double>> pageRanks = - network.run(new PageRank<Long>(numPages, DAMPENING_FACTOR, maxIterations)).getVertices(); + networkWithWeights.run(new PageRank<Long>(numPages, DAMPENING_FACTOR, maxIterations)).getVertices(); pageRanks.print(); @@ -60,7 +72,7 @@ public class PageRankExample implements ProgramDescription { int numOutEdges = (int) (Math.random() * (numPages / 2)); for (int i = 0; i < numOutEdges; i++) { long target = (long) (Math.random() * numPages) + 1; - out.collect(new Edge<Long, Double>(key, target, 1.0 / numOutEdges)); + out.collect(new Edge<Long, Double>(key, target, 1.0)); } } }); http://git-wip-us.apache.org/repos/asf/flink/blob/753c71ae/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java index 711cd61..7375d0c 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java @@ -580,5 +580,4 @@ public class TestJoinWithEdges extends JavaProgramTestBase { } } } -} - +} \ No newline at end of file