http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java index 50c9ae5..c490bb3 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java @@ -1,71 +1,94 @@ -package flink.graphs.example; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import flink.graphs.*; -import flink.graphs.library.LabelPropagation; +package org.apache.flink.graph.example; import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.*; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.library.LabelPropagation; import org.apache.flink.types.NullValue; import org.apache.flink.util.Collector; /** - * This example uses the label propagation algorithm to detect communities by propagating labels. - * Initially, each vertex is assigned its id as its label. - * The vertices iteratively propagate their labels to their neighbors and adopt the most frequent label - * among their neighbors. - * The algorithm converges when no vertex changes value or the maximum number of iterations have been reached. + * This example uses the label propagation algorithm to detect communities by + * propagating labels. Initially, each vertex is assigned its id as its label. + * The vertices iteratively propagate their labels to their neighbors and adopt + * the most frequent label among their neighbors. The algorithm converges when + * no vertex changes value or the maximum number of iterations have been + * reached. */ public class LabelPropagationExample implements ProgramDescription { - public static void main (String [] args) throws Exception { + public static void main(String[] args) throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env); - DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env); + DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env); + DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env); - Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env); + Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env); - DataSet<Vertex<Long, Long>> verticesWithCommunity = - graph.run(new LabelPropagation<Long>(maxIterations)).getVertices(); + DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run( + new LabelPropagation<Long>(maxIterations)).getVertices(); - verticesWithCommunity.print(); + verticesWithCommunity.print(); - env.execute(); - } + env.execute(); + } - @Override - public String getDescription() { - return "Label Propagation Example"; - } + @Override + public String getDescription() { + return "Label Propagation Example"; + } - private static long numVertices = 100; - private static int maxIterations = 20; + private static long numVertices = 100; + private static int maxIterations = 20; @SuppressWarnings("serial") private static DataSet<Vertex<Long, Long>> getVertexDataSet(ExecutionEnvironment env) { - return env.generateSequence(1, numVertices) - .map(new MapFunction<Long, Vertex<Long, Long>>() { - public Vertex<Long, Long> map(Long l) throws Exception { - return new Vertex<Long, Long>(l, l); - } - }); - } + return env.generateSequence(1, numVertices).map( + new MapFunction<Long, Vertex<Long, Long>>() { + public Vertex<Long, Long> map(Long l) throws Exception { + return new Vertex<Long, Long>(l, l); + } + }); + } - @SuppressWarnings("serial") + @SuppressWarnings("serial") private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) { - return env.generateSequence(1, numVertices) - .flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() { - @Override - public void flatMap(Long key, Collector<Edge<Long, NullValue>> out) { - int numOutEdges = (int) (Math.random() * (numVertices / 2)); - for (int i = 0; i < numOutEdges; i++) { - long target = (long) (Math.random() * numVertices) + 1; - out.collect(new Edge<Long, NullValue>(key, target, NullValue.getInstance())); - } - } - }); - } + return env.generateSequence(1, numVertices).flatMap( + new FlatMapFunction<Long, Edge<Long, NullValue>>() { + @Override + public void flatMap(Long key, + Collector<Edge<Long, NullValue>> out) { + int numOutEdges = (int) (Math.random() * (numVertices / 2)); + for (int i = 0; i < numOutEdges; i++) { + long target = (long) (Math.random() * numVertices) + 1; + out.collect(new Edge<Long, NullValue>(key, target, + NullValue.getInstance())); + } + } + }); + } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java index 668c765..948ac5b 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java @@ -1,4 +1,22 @@ -package flink.graphs.example; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; import java.util.ArrayList; import java.util.List; @@ -13,176 +31,186 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.EdgesFunctionWithVertexValue; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.MusicProfilesData; +import org.apache.flink.graph.library.LabelPropagation; import org.apache.flink.types.NullValue; import org.apache.flink.util.Collector; -import flink.graphs.Edge; -import flink.graphs.EdgeDirection; -import flink.graphs.EdgesFunctionWithVertexValue; -import flink.graphs.Graph; -import flink.graphs.Vertex; -import flink.graphs.example.utils.MusicProfilesData; -import flink.graphs.library.LabelPropagation; - @SuppressWarnings("serial") public class MusicProfiles implements ProgramDescription { /** - * This example demonstrates how to mix the "record" Flink API with the graph API. - * The input is a set <userId - songId - playCount> triplets and a set of - * bad records,i.e. song ids that should not be trusted. - * Initially, we use the record API to filter out the bad records. - * Then, we use the graph API to create a user -> song weighted bipartite graph - * and compute the top song (most listened) per user. - * Then, we use the record API again, to create a user-user similarity graph, - * based on common songs, where two users that listen to the same song are connected. - * Finally, we use the graph API to run the label propagation community detection algorithm - * on the similarity graph. + * This example demonstrates how to mix the "record" Flink API with the + * graph API. The input is a set <userId - songId - playCount> triplets and + * a set of bad records,i.e. song ids that should not be trusted. Initially, + * we use the record API to filter out the bad records. Then, we use the + * graph API to create a user -> song weighted bipartite graph and compute + * the top song (most listened) per user. Then, we use the record API again, + * to create a user-user similarity graph, based on common songs, where two + * users that listen to the same song are connected. Finally, we use the + * graph API to run the label propagation community detection algorithm on + * the similarity graph. */ - public static void main (String [] args) throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - final int numIterations = 10; - - /** - * Read the user-song-play triplets - * The format is <userID>\t<songID>\t<playcount> - */ - DataSet<Tuple3<String, String, Integer>> triplets = MusicProfilesData.getUserSongTriplets(env); - - /** - * Read the mismatches dataset and extract the songIDs - * The format is "ERROR: <songID trackID> song_title" - */ - DataSet<Tuple1<String>> mismatches = MusicProfilesData.getMismatches(env).map(new ExtractMismatchSongIds()); - - /** - * Filter out the mismatches from the triplets dataset - */ - DataSet<Tuple3<String, String, Integer>> validTriplets = triplets.coGroup(mismatches) - .where(1).equalTo(0).with(new FilterOutMismatches()); - - /** - * Create a user -> song weighted bipartite graph - * where the edge weights correspond to play counts - */ - Graph<String, NullValue, Integer> userSongGraph = Graph.fromTupleDataSet(validTriplets, env); - - /** - * Get the top track (most listened) for each user - */ - DataSet<Tuple2<String, String>> usersWithTopTrack = userSongGraph.reduceOnEdges(new GetTopSongPerUser(), - EdgeDirection.OUT).filter(new FilterSongNodes()); - - usersWithTopTrack.print(); - - /** - * Create a user-user similarity graph, based on common songs, - * i.e. two users that listen to the same song are connected. - * For each song, we create an edge between each pair of its in-neighbors. - */ - DataSet<Edge<String, NullValue>> similarUsers = userSongGraph.getEdges().groupBy(1) - .reduceGroup(new CreateSimilarUserEdges()).distinct(); - - Graph<String, Long, NullValue> similarUsersGraph = Graph.fromDataSet(similarUsers, - - new MapFunction<String, Long>() { - public Long map(String value) { return 1l; } - - }, env).getUndirected(); - - /** - * Detect user communities using the label propagation library method - */ - - // Initialize each vertex with a unique numeric label - DataSet<Tuple2<String, Long>> idsWithInitialLabels = similarUsersGraph.getVertices() - .reduceGroup(new AssignInitialLabelReducer()); - - // update the vertex values and run the label propagation algorithm - DataSet<Vertex<String, Long>> verticesWithCommunity = similarUsersGraph.joinWithVertices(idsWithInitialLabels, - new MapFunction<Tuple2<Long, Long>, Long>() { - public Long map(Tuple2<Long, Long> value) { return value.f1; } - }) - .run(new LabelPropagation<String>(numIterations)).getVertices(); - - verticesWithCommunity.print(); - - env.execute(); - } + public static void main(String[] args) throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + final int numIterations = 10; + + /** + * Read the user-song-play triplets The format is + * <userID>\t<songID>\t<playcount> + */ + DataSet<Tuple3<String, String, Integer>> triplets = MusicProfilesData.getUserSongTriplets(env); + + /** + * Read the mismatches dataset and extract the songIDs The format is + * "ERROR: <songID trackID> song_title" + */ + DataSet<Tuple1<String>> mismatches = MusicProfilesData.getMismatches(env).map(new ExtractMismatchSongIds()); + + /** + * Filter out the mismatches from the triplets dataset + */ + DataSet<Tuple3<String, String, Integer>> validTriplets = triplets + .coGroup(mismatches).where(1).equalTo(0) + .with(new FilterOutMismatches()); + + /** + * Create a user -> song weighted bipartite graph where the edge weights + * correspond to play counts + */ + Graph<String, NullValue, Integer> userSongGraph = Graph.fromTupleDataSet(validTriplets, env); + + /** + * Get the top track (most listened) for each user + */ + DataSet<Tuple2<String, String>> usersWithTopTrack = userSongGraph + .reduceOnEdges(new GetTopSongPerUser(), EdgeDirection.OUT) + .filter(new FilterSongNodes()); + + usersWithTopTrack.print(); + + /** + * Create a user-user similarity graph, based on common songs, i.e. two + * users that listen to the same song are connected. For each song, we + * create an edge between each pair of its in-neighbors. + */ + DataSet<Edge<String, NullValue>> similarUsers = userSongGraph + .getEdges().groupBy(1) + .reduceGroup(new CreateSimilarUserEdges()).distinct(); + + Graph<String, Long, NullValue> similarUsersGraph = Graph.fromDataSet(similarUsers, + new MapFunction<String, Long>() { + public Long map(String value) { + return 1l; + } + }, env).getUndirected(); + + /** + * Detect user communities using the label propagation library method + */ + + // Initialize each vertex with a unique numeric label + DataSet<Tuple2<String, Long>> idsWithInitialLabels = similarUsersGraph + .getVertices().reduceGroup(new AssignInitialLabelReducer()); + + // update the vertex values and run the label propagation algorithm + DataSet<Vertex<String, Long>> verticesWithCommunity = similarUsersGraph + .joinWithVertices(idsWithInitialLabels, + new MapFunction<Tuple2<Long, Long>, Long>() { + public Long map(Tuple2<Long, Long> value) { + return value.f1; + } + }).run(new LabelPropagation<String>(numIterations)) + .getVertices(); + + verticesWithCommunity.print(); + + env.execute(); + } public static final class ExtractMismatchSongIds implements MapFunction<String, Tuple1<String>> { + public Tuple1<String> map(String value) { - String[] tokens = value.split("\\s+"); + String[] tokens = value.split("\\s+"); String songId = tokens[1].substring(1); return new Tuple1<String>(songId); } - } - - public static final class FilterOutMismatches implements CoGroupFunction<Tuple3<String, String, Integer>, - Tuple1<String>, Tuple3<String, String, Integer>> { - public void coGroup( - Iterable<Tuple3<String, String, Integer>> triplets, - Iterable<Tuple1<String>> invalidSongs, - Collector<Tuple3<String, String, Integer>> out) { + } + + public static final class FilterOutMismatches implements CoGroupFunction<Tuple3<String, String, Integer>, + Tuple1<String>, Tuple3<String, String, Integer>> { + + public void coGroup(Iterable<Tuple3<String, String, Integer>> triplets, + Iterable<Tuple1<String>> invalidSongs, Collector<Tuple3<String, String, Integer>> out) { + if (!invalidSongs.iterator().hasNext()) { // this is a valid triplet for (Tuple3<String, String, Integer> triplet : triplets) { - out.collect(triplet); + out.collect(triplet); } } } - } + } - public static final class FilterSongNodes implements FilterFunction<Tuple2<String, String>> { + public static final class FilterSongNodes implements FilterFunction<Tuple2<String, String>> { public boolean filter(Tuple2<String, String> value) throws Exception { return !value.f1.equals(""); } - } + } - public static final class GetTopSongPerUser implements EdgesFunctionWithVertexValue - <String, NullValue, Integer, Tuple2<String, String>> { - public Tuple2<String, String> iterateEdges(Vertex<String, NullValue> vertex, + public static final class GetTopSongPerUser implements EdgesFunctionWithVertexValue<String, NullValue, Integer, + Tuple2<String, String>> { + + public Tuple2<String, String> iterateEdges(Vertex<String, NullValue> vertex, Iterable<Edge<String, Integer>> edges) { + int maxPlaycount = 0; String topSong = ""; - for (Edge<String, Integer> edge: edges) { + for (Edge<String, Integer> edge : edges) { if (edge.getValue() > maxPlaycount) { maxPlaycount = edge.getValue(); topSong = edge.getTarget(); } } - return new Tuple2<String, String> (vertex.getId(), topSong); + return new Tuple2<String, String>(vertex.getId(), topSong); } - } + } + + public static final class CreateSimilarUserEdges implements GroupReduceFunction<Edge<String, Integer>, + Edge<String, NullValue>> { - public static final class CreateSimilarUserEdges implements GroupReduceFunction<Edge<String, Integer>, - Edge<String, NullValue>> { public void reduce(Iterable<Edge<String, Integer>> edges, Collector<Edge<String, NullValue>> out) { List<String> listeners = new ArrayList<String>(); for (Edge<String, Integer> edge : edges) { listeners.add(edge.getSource()); } - for (int i=0; i < listeners.size()-1; i++) { - out.collect(new Edge<String, NullValue>(listeners.get(i), listeners.get(i+1), - NullValue.getInstance())); + for (int i = 0; i < listeners.size() - 1; i++) { + out.collect(new Edge<String, NullValue>(listeners.get(i), + listeners.get(i + 1), NullValue.getInstance())); } } - } + } - public static final class AssignInitialLabelReducer implements GroupReduceFunction<Vertex<String, Long>, - Tuple2<String, Long>> { - public void reduce(Iterable<Vertex<String, Long>> vertices, Collector<Tuple2<String, Long>> out) { + public static final class AssignInitialLabelReducer implements GroupReduceFunction<Vertex<String, Long>, + Tuple2<String, Long>> { + + public void reduce(Iterable<Vertex<String, Long>> vertices, Collector<Tuple2<String, Long>> out) { long label = 0; for (Vertex<String, Long> vertex : vertices) { out.collect(new Tuple2<String, Long>(vertex.getId(), label)); label++; } } - } + } @Override public String getDescription() { return "Music Profiles Example"; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java index e3f815a..400508c 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java @@ -1,80 +1,102 @@ -package flink.graphs.example; - - -import flink.graphs.*; -import flink.graphs.library.PageRank; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.*; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.library.PageRank; import org.apache.flink.util.Collector; public class PageRankExample implements ProgramDescription { - @SuppressWarnings("serial") - public static void main (String [] args) throws Exception { + @SuppressWarnings("serial") + public static void main(String[] args) throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Vertex<Long, Double>> pages = getPagesDataSet(env); + + DataSet<Edge<Long, Double>> links = getLinksDataSet(env); + + Graph<Long, Double, Double> network = Graph.fromDataSet(pages, links, env); - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees(); - DataSet<Vertex<Long,Double>> pages = getPagesDataSet(env); + // assign the transition probabilities as the edge weights + Graph<Long, Double, Double> networkWithWeights = network + .joinWithEdgesOnSource(vertexOutDegrees, + new MapFunction<Tuple2<Double, Long>, Double>() { + public Double map(Tuple2<Double, Long> value) { + return value.f0 / value.f1; + } + }); - DataSet<Edge<Long,Double>> links = getLinksDataSet(env); + DataSet<Vertex<Long, Double>> pageRanks = networkWithWeights.run( + new PageRank<Long>(numPages, DAMPENING_FACTOR, maxIterations)) + .getVertices(); - Graph<Long, Double, Double> network = new Graph<Long, Double, Double>(pages, links, env); - - DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees(); - - // assign the transition probabilities as the edge weights - Graph<Long, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, - new MapFunction<Tuple2<Double, Long>, Double>() { - public Double map(Tuple2<Double, Long> value) { - return value.f0 / value.f1; + pageRanks.print(); + + env.execute(); + } + + @Override + public String getDescription() { + return "PageRank"; + } + + private static final double DAMPENING_FACTOR = 0.85; + private static long numPages = 10; + private static int maxIterations = 10; + + @SuppressWarnings("serial") + private static DataSet<Vertex<Long, Double>> getPagesDataSet(ExecutionEnvironment env) { + return env.generateSequence(1, numPages).map( + new MapFunction<Long, Vertex<Long, Double>>() { + @Override + public Vertex<Long, Double> map(Long l) throws Exception { + return new Vertex<Long, Double>(l, 1.0 / numPages); } }); - DataSet<Vertex<Long,Double>> pageRanks = - networkWithWeights.run(new PageRank<Long>(numPages, DAMPENING_FACTOR, maxIterations)).getVertices(); - - pageRanks.print(); - - env.execute(); - } - - @Override - public String getDescription() { - return "PageRank"; - } - - private static final double DAMPENING_FACTOR = 0.85; - private static long numPages = 10; - private static int maxIterations = 10; - - @SuppressWarnings("serial") - private static DataSet<Vertex<Long,Double>> getPagesDataSet(ExecutionEnvironment env) { - return env.generateSequence(1, numPages) - .map(new MapFunction<Long, Vertex<Long, Double>>() { - @Override - public Vertex<Long, Double> map(Long l) throws Exception { - return new Vertex<Long, Double>(l, 1.0 / numPages); - } - }); - - } - - @SuppressWarnings("serial") - private static DataSet<Edge<Long, Double>> getLinksDataSet(ExecutionEnvironment env) { - return env.generateSequence(1, numPages) - .flatMap(new FlatMapFunction<Long, Edge<Long, Double>>() { - @Override - public void flatMap(Long key, Collector<Edge<Long, Double>> out) throws Exception { - int numOutEdges = (int) (Math.random() * (numPages / 2)); - for (int i = 0; i < numOutEdges; i++) { - long target = (long) (Math.random() * numPages) + 1; - out.collect(new Edge<Long, Double>(key, target, 1.0)); - } - } - }); - } + } + + @SuppressWarnings("serial") + private static DataSet<Edge<Long, Double>> getLinksDataSet(ExecutionEnvironment env) { + return env.generateSequence(1, numPages).flatMap( + new FlatMapFunction<Long, Edge<Long, Double>>() { + @Override + public void flatMap(Long key, + Collector<Edge<Long, Double>> out) throws Exception { + int numOutEdges = (int) (Math.random() * (numPages / 2)); + for (int i = 0; i < numOutEdges; i++) { + long target = (long) (Math.random() * numPages) + 1; + out.collect(new Edge<Long, Double>(key, target, 1.0)); + } + } + }); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java index 75e33dc..7f31525 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java @@ -1,41 +1,59 @@ -package flink.graphs.example; - -import flink.graphs.Edge; -import flink.graphs.Graph; -import flink.graphs.Vertex; -import flink.graphs.example.utils.ExampleUtils; -import flink.graphs.library.SingleSourceShortestPaths; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.ExampleUtils; +import org.apache.flink.graph.library.SingleSourceShortestPaths; public class SingleSourceShortestPathsExample implements ProgramDescription { - private static int maxIterations = 5; + private static int maxIterations = 5; - public static void main (String [] args) throws Exception { + public static void main(String[] args) throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Vertex<Long, Double>> vertices = ExampleUtils.getLongDoubleVertexData(env); + DataSet<Vertex<Long, Double>> vertices = ExampleUtils.getLongDoubleVertexData(env); - DataSet<Edge<Long, Double>> edges = ExampleUtils.getLongDoubleEdgeData(env); + DataSet<Edge<Long, Double>> edges = ExampleUtils.getLongDoubleEdgeData(env); - Long srcVertexId = 1L; + Long srcVertexId = 1L; - Graph<Long, Double, Double> graph = Graph.create(vertices, edges, env); + Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env); - DataSet<Vertex<Long,Double>> singleSourceShortestPaths = - graph.run(new SingleSourceShortestPaths<Long>(srcVertexId, maxIterations)).getVertices(); + DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph + .run(new SingleSourceShortestPaths<Long>(srcVertexId, + maxIterations)).getVertices(); - singleSourceShortestPaths.print(); + singleSourceShortestPaths.print(); - env.execute(); - } + env.execute(); + } - @Override - public String getDescription() { - return "Single Source Shortest Paths"; - } + @Override + public String getDescription() { + return "Single Source Shortest Paths"; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java index 8c131e4..3f1c7bb 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java @@ -1,10 +1,27 @@ -package flink.graphs.example.utils; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example.utils; import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.graph.Edge; import org.apache.flink.types.NullValue; -import flink.graphs.Edge; - public class EdgeWithLongIdNullValueParser extends RichMapFunction<String, Edge<Long, NullValue>> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java index 4588230..d986478 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java @@ -1,4 +1,22 @@ -package flink.graphs.example.utils; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example.utils; import java.io.PrintStream; import java.util.ArrayList; @@ -10,46 +28,47 @@ import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; import org.apache.flink.types.NullValue; import org.apache.flink.util.Collector; -import flink.graphs.Edge; -import flink.graphs.Vertex; - public class ExampleUtils { @SuppressWarnings({ "serial", "unchecked", "rawtypes" }) - public static void printResult(DataSet set, String msg, ExecutionEnvironment env) { + public static void printResult(DataSet set, String msg) { set.output(new PrintingOutputFormatWithMessage(msg) { }); } - - public static class PrintingOutputFormatWithMessage<T> implements OutputFormat<T> { + + public static class PrintingOutputFormatWithMessage<T> implements + OutputFormat<T> { private static final long serialVersionUID = 1L; - + private transient PrintStream stream; - + private transient String prefix; - + private String message; - + // -------------------------------------------------------------------------------------------- - + /** * Instantiates a printing output format that prints to standard out. */ - public PrintingOutputFormatWithMessage() {} - + public PrintingOutputFormatWithMessage() { + } + public PrintingOutputFormatWithMessage(String msg) { - this.message = msg; + this.message = msg; } @Override public void open(int taskNumber, int numTasks) { // get the target stream this.stream = System.out; - + // set the prefix to message this.prefix = message + ": "; } @@ -58,8 +77,7 @@ public class ExampleUtils { public void writeRecord(T record) { if (this.prefix != null) { this.stream.println(this.prefix + record.toString()); - } - else { + } else { this.stream.println(record.toString()); } } @@ -69,41 +87,46 @@ public class ExampleUtils { this.stream = null; this.prefix = null; } - + @Override public String toString() { return "Print to System.out"; } @Override - public void configure(Configuration parameters) {} + public void configure(Configuration parameters) { + } } @SuppressWarnings("serial") - public static DataSet<Vertex<Long, NullValue>> getVertexIds(ExecutionEnvironment env, - final long numVertices) { - return env.generateSequence(1, numVertices) - .map(new MapFunction<Long, Vertex<Long, NullValue>>() { - public Vertex<Long, NullValue> map(Long l) { - return new Vertex<Long, NullValue>(l, NullValue.getInstance()); - } - }); + public static DataSet<Vertex<Long, NullValue>> getVertexIds( + ExecutionEnvironment env, final long numVertices) { + return env.generateSequence(1, numVertices).map( + new MapFunction<Long, Vertex<Long, NullValue>>() { + public Vertex<Long, NullValue> map(Long l) { + return new Vertex<Long, NullValue>(l, NullValue + .getInstance()); + } + }); } @SuppressWarnings("serial") - public static DataSet<Edge<Long, NullValue>> getRandomEdges(ExecutionEnvironment env, - final long numVertices) { - return env.generateSequence(1, numVertices) - .flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() { - @Override - public void flatMap(Long key, Collector<Edge<Long, NullValue>> out) throws Exception { - int numOutEdges = (int) (Math.random() * (numVertices / 2)); - for (int i = 0; i < numOutEdges; i++) { - long target = (long) (Math.random() * numVertices) + 1; - out.collect(new Edge<Long, NullValue>(key, target, NullValue.getInstance())); - } - } - }); + public static DataSet<Edge<Long, NullValue>> getRandomEdges( + ExecutionEnvironment env, final long numVertices) { + return env.generateSequence(1, numVertices).flatMap( + new FlatMapFunction<Long, Edge<Long, NullValue>>() { + @Override + public void flatMap(Long key, + Collector<Edge<Long, NullValue>> out) + throws Exception { + int numOutEdges = (int) (Math.random() * (numVertices / 2)); + for (int i = 0; i < numOutEdges; i++) { + long target = (long) (Math.random() * numVertices) + 1; + out.collect(new Edge<Long, NullValue>(key, target, + NullValue.getInstance())); + } + } + }); } public static final DataSet<Vertex<Long, Double>> getLongDoubleVertexData( @@ -117,7 +140,7 @@ public class ExampleUtils { return env.fromCollection(vertices); } - + public static final DataSet<Edge<Long, Double>> getLongDoubleEdgeData( ExecutionEnvironment env) { List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); @@ -128,8 +151,7 @@ public class ExampleUtils { edges.add(new Edge<Long, Double>(3L, 5L, 35.0)); edges.add(new Edge<Long, Double>(4L, 5L, 45.0)); edges.add(new Edge<Long, Double>(5L, 1L, 51.0)); - + return env.fromCollection(edges); } } - http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java index cfe9c88..0a7162d 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java @@ -1,4 +1,22 @@ -package flink.graphs.example.utils; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example.utils; import java.util.ArrayList; import java.util.List; http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java index fc77d0b..69d7713 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java @@ -1,10 +1,28 @@ -package flink.graphs.library; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import flink.graphs.*; -import flink.graphs.spargel.MessageIterator; -import flink.graphs.spargel.MessagingFunction; -import flink.graphs.spargel.VertexUpdateFunction; +package org.apache.flink.graph.library; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.types.NullValue; import java.io.Serializable; @@ -13,43 +31,43 @@ import java.util.Map; import java.util.Map.Entry; /** - * An implementation of the label propagation algorithm. - * The iterative algorithm detects communities by propagating labels. - * In each iteration, a vertex adopts the label that is most frequent among its neighbors' labels. - * Labels are represented by Longs and we assume a total ordering among them, in order to break ties. - * The algorithm converges when no vertex changes its value or the maximum number of iterations have been reached. - * Note that different initializations might lead to different results. - * + * An implementation of the label propagation algorithm. The iterative algorithm + * detects communities by propagating labels. In each iteration, a vertex adopts + * the label that is most frequent among its neighbors' labels. Labels are + * represented by Longs and we assume a total ordering among them, in order to + * break ties. The algorithm converges when no vertex changes its value or the + * maximum number of iterations have been reached. Note that different + * initializations might lead to different results. + * */ @SuppressWarnings("serial") -public class LabelPropagation<K extends Comparable<K> & Serializable> implements GraphAlgorithm<K, Long, NullValue> { +public class LabelPropagation<K extends Comparable<K> & Serializable> + implements GraphAlgorithm<K, Long, NullValue> { - private final int maxIterations; + private final int maxIterations; - public LabelPropagation(int maxIterations) { - this.maxIterations = maxIterations; - } + public LabelPropagation(int maxIterations) { + this.maxIterations = maxIterations; + } - @Override - public Graph<K, Long, NullValue> run(Graph<K, Long, NullValue> input) { + @Override + public Graph<K, Long, NullValue> run(Graph<K, Long, NullValue> input) { - // iteratively adopt the most frequent label among the neighbors - // of each vertex - return input.runVertexCentricIteration( - new UpdateVertexLabel<K>(), - new SendNewLabelToNeighbors<K>(), - maxIterations - ); - } + // iteratively adopt the most frequent label among the neighbors + // of each vertex + return input.runVertexCentricIteration(new UpdateVertexLabel<K>(), + new SendNewLabelToNeighbors<K>(), maxIterations); + } - /** - * Function that updates the value of a vertex by adopting the most frequent label - * among its in-neighbors - */ - public static final class UpdateVertexLabel<K extends Comparable<K> & Serializable> - extends VertexUpdateFunction<K, Long, Long> { + /** + * Function that updates the value of a vertex by adopting the most frequent + * label among its in-neighbors + */ + public static final class UpdateVertexLabel<K extends Comparable<K> & Serializable> + extends VertexUpdateFunction<K, Long, Long> { - public void updateVertex(K vertexKey, Long vertexValue, MessageIterator<Long> inMessages) { + public void updateVertex(K vertexKey, Long vertexValue, + MessageIterator<Long> inMessages) { Map<Long, Long> labelsWithFrequencies = new HashMap<Long, Long>(); long maxFrequency = 1; @@ -60,12 +78,12 @@ public class LabelPropagation<K extends Comparable<K> & Serializable> implements if (labelsWithFrequencies.containsKey(msg)) { long currentFreq = labelsWithFrequencies.get(msg); labelsWithFrequencies.put(msg, currentFreq + 1); - } - else { + } else { labelsWithFrequencies.put(msg, 1L); } } - // select the most frequent label: if two or more labels have the same frequency, + // select the most frequent label: if two or more labels have the + // same frequency, // the node adopts the label with the highest value for (Entry<Long, Long> entry : labelsWithFrequencies.entrySet()) { if (entry.getValue() == maxFrequency) { @@ -73,8 +91,7 @@ public class LabelPropagation<K extends Comparable<K> & Serializable> implements if (entry.getKey() > mostFrequentLabel) { mostFrequentLabel = entry.getKey(); } - } - else if (entry.getValue() > maxFrequency) { + } else if (entry.getValue() > maxFrequency) { maxFrequency = entry.getValue(); mostFrequentLabel = entry.getKey(); } @@ -83,16 +100,16 @@ public class LabelPropagation<K extends Comparable<K> & Serializable> implements // set the new vertex value setNewVertexValue(mostFrequentLabel); } - } + } - /** - * Sends the vertex label to all out-neighbors - */ - public static final class SendNewLabelToNeighbors<K extends Comparable<K> & Serializable> - extends MessagingFunction<K, Long, Long, NullValue> { + /** + * Sends the vertex label to all out-neighbors + */ + public static final class SendNewLabelToNeighbors<K extends Comparable<K> & Serializable> + extends MessagingFunction<K, Long, Long, NullValue> { - public void sendMessages(K vertexKey, Long newLabel) { - sendMessageToAllNeighbors(newLabel); - } - } + public void sendMessages(K vertexKey, Long newLabel) { + sendMessageToAllNeighbors(newLabel); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java index d29a9dc..39b8ef1 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java @@ -1,76 +1,95 @@ -package flink.graphs.library; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import flink.graphs.Edge; -import flink.graphs.Graph; -import flink.graphs.GraphAlgorithm; -import flink.graphs.spargel.MessageIterator; -import flink.graphs.spargel.MessagingFunction; -import flink.graphs.spargel.VertexUpdateFunction; +package org.apache.flink.graph.library; import java.io.Serializable; -public class PageRank<K extends Comparable<K> & Serializable> implements GraphAlgorithm<K, Double, Double> { +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexUpdateFunction; - private long numVertices; - private double beta; - private int maxIterations; +public class PageRank<K extends Comparable<K> & Serializable> implements + GraphAlgorithm<K, Double, Double> { - public PageRank(long numVertices, double beta, int maxIterations) { - this.numVertices = numVertices; - this.beta = beta; - this.maxIterations = maxIterations; - } + private long numVertices; + private double beta; + private int maxIterations; - @Override - public Graph<K, Double, Double> run(Graph<K, Double, Double> network) { - return network.runVertexCentricIteration( - new VertexRankUpdater<K>(numVertices, beta), - new RankMessenger<K>(), - maxIterations - ); - } + public PageRank(long numVertices, double beta, int maxIterations) { + this.numVertices = numVertices; + this.beta = beta; + this.maxIterations = maxIterations; + } + @Override + public Graph<K, Double, Double> run(Graph<K, Double, Double> network) { + return network.runVertexCentricIteration(new VertexRankUpdater<K>( + numVertices, beta), new RankMessenger<K>(), maxIterations); + } - /** - * Function that updates the rank of a vertex by summing up the partial ranks from all incoming messages - * and then applying the dampening formula. - */ - @SuppressWarnings("serial") - public static final class VertexRankUpdater<K extends Comparable<K> & Serializable> extends VertexUpdateFunction<K, Double, Double> { + /** + * Function that updates the rank of a vertex by summing up the partial + * ranks from all incoming messages and then applying the dampening formula. + */ + @SuppressWarnings("serial") + public static final class VertexRankUpdater<K extends Comparable<K> & Serializable> + extends VertexUpdateFunction<K, Double, Double> { - private final long numVertices; - private final double beta; + private final long numVertices; + private final double beta; - public VertexRankUpdater(long numVertices, double beta) { - this.numVertices = numVertices; - this.beta = beta; - } + public VertexRankUpdater(long numVertices, double beta) { + this.numVertices = numVertices; + this.beta = beta; + } - @Override - public void updateVertex(K vertexKey, Double vertexValue, MessageIterator<Double> inMessages) { - double rankSum = 0.0; - for (double msg : inMessages) { - rankSum += msg; - } + @Override + public void updateVertex(K vertexKey, Double vertexValue, + MessageIterator<Double> inMessages) { + double rankSum = 0.0; + for (double msg : inMessages) { + rankSum += msg; + } - // apply the dampening factor / random jump - double newRank = (beta * rankSum) + (1-beta)/numVertices; - setNewVertexValue(newRank); - } - } + // apply the dampening factor / random jump + double newRank = (beta * rankSum) + (1 - beta) / numVertices; + setNewVertexValue(newRank); + } + } - /** - * Distributes the rank of a vertex among all target vertices according to the transition probability, - * which is associated with an edge as the edge value. - */ - @SuppressWarnings("serial") - public static final class RankMessenger<K extends Comparable<K> & Serializable> extends MessagingFunction<K, Double, Double, Double> { + /** + * Distributes the rank of a vertex among all target vertices according to + * the transition probability, which is associated with an edge as the edge + * value. + */ + @SuppressWarnings("serial") + public static final class RankMessenger<K extends Comparable<K> & Serializable> + extends MessagingFunction<K, Double, Double, Double> { - @Override - public void sendMessages(K vertexId, Double newRank) { - for (Edge<K, Double> edge : getOutgoingEdges()) { - sendMessageTo(edge.getTarget(), newRank * edge.getValue()); - } - } - } + @Override + public void sendMessages(K vertexId, Double newRank) { + for (Edge<K, Double> edge : getOutgoingEdges()) { + sendMessageTo(edge.getTarget(), newRank * edge.getValue()); + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java index 0da8a90..2f575e7 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java @@ -1,94 +1,114 @@ -package flink.graphs.library; - -import flink.graphs.*; -import flink.graphs.spargel.MessageIterator; -import flink.graphs.spargel.MessagingFunction; -import flink.graphs.spargel.VertexUpdateFunction; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexUpdateFunction; import java.io.Serializable; @SuppressWarnings("serial") -public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable> implements GraphAlgorithm<K, Double, Double> { +public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable> + implements GraphAlgorithm<K, Double, Double> { + + private final K srcVertexId; + private final Integer maxIterations; - private final K srcVertexId; - private final Integer maxIterations; + public SingleSourceShortestPaths(K srcVertexId, Integer maxIterations) { + this.srcVertexId = srcVertexId; + this.maxIterations = maxIterations; + } - public SingleSourceShortestPaths(K srcVertexId, Integer maxIterations) { - this.srcVertexId = srcVertexId; - this.maxIterations = maxIterations; - } + @Override + public Graph<K, Double, Double> run(Graph<K, Double, Double> input) { - @Override - public Graph<K, Double, Double> run(Graph<K, Double, Double> input) { + return input.mapVertices(new InitVerticesMapper<K>(srcVertexId)) + .runVertexCentricIteration(new VertexDistanceUpdater<K>(), + new MinDistanceMessenger<K>(), maxIterations); + } - return input.mapVertices(new InitVerticesMapper<K>(srcVertexId)) - .runVertexCentricIteration( - new VertexDistanceUpdater<K>(), - new MinDistanceMessenger<K>(), - maxIterations - ); - } + public static final class InitVerticesMapper<K extends Comparable<K> & Serializable> + implements MapFunction<Vertex<K, Double>, Double> { - public static final class InitVerticesMapper<K extends Comparable<K> & Serializable> - implements MapFunction<Vertex<K,Double>, Double> { + private K srcVertexId; - private K srcVertexId; + public InitVerticesMapper(K srcId) { + this.srcVertexId = srcId; + } - public InitVerticesMapper(K srcId) { - this.srcVertexId = srcId; - } - public Double map(Vertex<K, Double> value) { if (value.f0.equals(srcVertexId)) { return 0.0; - } - else { + } else { return Double.MAX_VALUE; } } - } - - /** - * Function that updates the value of a vertex by picking the minimum distance from all incoming messages. - * - * @param <K> - */ - public static final class VertexDistanceUpdater<K extends Comparable<K> & Serializable> - extends VertexUpdateFunction<K, Double, Double> { - - @Override - public void updateVertex(K vertexKey, Double vertexValue, MessageIterator<Double> inMessages) { - - Double minDistance = Double.MAX_VALUE; - - for (double msg : inMessages) { - if (msg < minDistance) { - minDistance = msg; - } - } - - if (vertexValue > minDistance) { - setNewVertexValue(minDistance); - } - } - } - - /** - * Distributes the minimum distance associated with a given vertex among all the target vertices - * summed up with the edge's value. - * - * @param <K> - */ - public static final class MinDistanceMessenger<K extends Comparable<K> & Serializable> - extends MessagingFunction<K, Double, Double, Double> { - - @Override - public void sendMessages(K vertexKey, Double newDistance) throws Exception { - for (Edge<K, Double> edge : getOutgoingEdges()) { - sendMessageTo(edge.getTarget(), newDistance + edge.getValue()); - } - } - } + } + + /** + * Function that updates the value of a vertex by picking the minimum + * distance from all incoming messages. + * + * @param <K> + */ + public static final class VertexDistanceUpdater<K extends Comparable<K> & Serializable> + extends VertexUpdateFunction<K, Double, Double> { + + @Override + public void updateVertex(K vertexKey, Double vertexValue, + MessageIterator<Double> inMessages) { + + Double minDistance = Double.MAX_VALUE; + + for (double msg : inMessages) { + if (msg < minDistance) { + minDistance = msg; + } + } + + if (vertexValue > minDistance) { + setNewVertexValue(minDistance); + } + } + } + + /** + * Distributes the minimum distance associated with a given vertex among all + * the target vertices summed up with the edge's value. + * + * @param <K> + */ + public static final class MinDistanceMessenger<K extends Comparable<K> & Serializable> + extends MessagingFunction<K, Double, Double, Double> { + + @Override + public void sendMessages(K vertexKey, Double newDistance) + throws Exception { + for (Edge<K, Double> edge : getOutgoingEdges()) { + sendMessageTo(edge.getTarget(), newDistance + edge.getValue()); + } + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/package-info.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/package-info.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/package-info.java deleted file mode 100644 index 695b2b8..0000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * - */ -package flink.graphs; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java index 08ba2c0..d6fdc8a 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package flink.graphs.spargel; +package org.apache.flink.graph.spargel; import java.util.Iterator; http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java index ab451bb..e8a297f 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package flink.graphs.spargel; +package org.apache.flink.graph.spargel; import java.io.Serializable; import java.util.Collection; @@ -26,11 +26,10 @@ import org.apache.flink.api.common.aggregators.Aggregator; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; import org.apache.flink.types.Value; import org.apache.flink.util.Collector; -import flink.graphs.Edge; - /** * The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}. * http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java index 5f89e90..9c72485 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package flink.graphs.spargel; +package org.apache.flink.graph.spargel; import java.io.Serializable; import java.util.ArrayList; @@ -38,11 +38,10 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Vertex; import org.apache.flink.util.Collector; -import flink.graphs.Edge; -import flink.graphs.Vertex; - /** * This class represents iterative graph computations, programmed in a vertex-centric perspective. * It is a special case of <i>Bulk Synchronous Parallel<i> computation. The paradigm has also been @@ -106,7 +105,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf, MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf, DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue, - int maximumNumberOfIterations, boolean edgeHasValueMarker) + int maximumNumberOfIterations) { Validate.notNull(uf); Validate.notNull(mf); @@ -319,7 +318,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf, int maximumNumberOfIterations) { - return new VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations, true); + return new VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java index e30451c..1157a18 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java @@ -16,18 +16,17 @@ * limitations under the License. */ -package flink.graphs.spargel; +package org.apache.flink.graph.spargel; import java.io.Serializable; import java.util.Collection; import org.apache.flink.api.common.aggregators.Aggregator; import org.apache.flink.api.common.functions.IterationRuntimeContext; +import org.apache.flink.graph.Vertex; import org.apache.flink.types.Value; import org.apache.flink.util.Collector; -import flink.graphs.Vertex; - /** * This class must be extended by functions that compute the state of the vertex depending on the old state and the * incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)}, which is http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java index 86103a6..a7b7b62 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java @@ -1,11 +1,28 @@ -package flink.graphs.utils; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils; import java.io.Serializable; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; - -import flink.graphs.Edge; +import org.apache.flink.graph.Edge; public class EdgeToTuple3Map<K extends Comparable<K> & Serializable, EV extends Serializable> implements MapFunction<Edge<K, EV>, Tuple3<K, K, EV>> { http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java index 0b0dc18..aba1c14 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java @@ -1,4 +1,22 @@ -package flink.graphs.utils; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils; import java.util.ArrayList; import java.util.List; @@ -14,27 +32,26 @@ public class GraphUtils { @SuppressWarnings({ "unchecked", "rawtypes" }) public static DataSet<Integer> count(DataSet set, ExecutionEnvironment env) { - List<Integer> list = new ArrayList<>(); + List<Integer> list = new ArrayList<Integer>(); list.add(0); DataSet<Integer> initialCount = env.fromCollection(list); - return set - .map(new OneMapper()) - .union(initialCount) - .reduce(new AddOnesReducer()) - .first(1); - } + return set.map(new OneMapper()).union(initialCount) + .reduce(new AddOnesReducer()).first(1); + } + + private static final class OneMapper<T extends Tuple> implements + MapFunction<T, Integer> { + @Override + public Integer map(T o) throws Exception { + return 1; + } + } - private static final class OneMapper<T extends Tuple> implements MapFunction<T, Integer> { - @Override - public Integer map(T o) throws Exception { - return 1; - } - } - - private static final class AddOnesReducer implements ReduceFunction<Integer> { - @Override - public Integer reduce(Integer one, Integer two) throws Exception { - return one + two; - } - } + private static final class AddOnesReducer implements + ReduceFunction<Integer> { + @Override + public Integer reduce(Integer one, Integer two) throws Exception { + return one + two; + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java index 893ae95..d58e4ff 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java @@ -1,11 +1,28 @@ -package flink.graphs.utils; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils; import java.io.Serializable; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; - -import flink.graphs.Vertex; +import org.apache.flink.graph.Vertex; public class Tuple2ToVertexMap<K extends Comparable<K> & Serializable, VV extends Serializable> implements MapFunction<Tuple2<K, VV>, Vertex<K, VV>> { http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java index 2a6cb23..3668dd2 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java @@ -1,11 +1,28 @@ -package flink.graphs.utils; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils; import java.io.Serializable; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple3; - -import flink.graphs.Edge; +import org.apache.flink.graph.Edge; /** * create an Edge DataSetfrom a Tuple3 dataset http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java index 30f867d..318e1ed 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java @@ -1,11 +1,28 @@ -package flink.graphs.utils; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.utils; import java.io.Serializable; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; - -import flink.graphs.Vertex; +import org.apache.flink.graph.Vertex; public class VertexToTuple2Map<K extends Comparable<K> & Serializable, VV extends Serializable> implements MapFunction<Vertex<K, VV>, Tuple2<K, VV>> { http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java index 88dfcde..47209b3 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java @@ -1,21 +1,39 @@ -package flink.graphs.validation; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.validation; + import java.io.Serializable; import org.apache.flink.api.java.DataSet; - -import flink.graphs.Graph; +import org.apache.flink.graph.Graph; /** * A validation method for different types of Graphs - * + * * @param <K> * @param <VV> * @param <EV> */ @SuppressWarnings("serial") -public abstract class GraphValidator<K extends Comparable<K> & Serializable, VV extends Serializable, - EV extends Serializable> implements Serializable{ +public abstract class GraphValidator<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> + implements Serializable { - public abstract DataSet<Boolean> validate(Graph<K, VV, EV> graph); + public abstract DataSet<Boolean> validate(Graph<K, VV, EV> graph); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java index 6b7a619..b043f3c 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java @@ -1,77 +1,87 @@ -package flink.graphs.validation; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.validation; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.utils.GraphUtils; import org.apache.flink.util.Collector; -import flink.graphs.Edge; -import flink.graphs.Graph; -import flink.graphs.Vertex; -import flink.graphs.utils.GraphUtils; - import java.io.Serializable; @SuppressWarnings("serial") -public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, VV extends Serializable, - EV extends Serializable> extends GraphValidator<K, VV, EV> { - - /** - * Checks that the edge set input contains valid vertex Ids, - * i.e. that they also exist in the vertex input set. - * @return a singleton DataSet<Boolean> stating whether a graph is valid - * with respect to its vertex ids. - */ - @Override - public DataSet<Boolean> validate(Graph<K, VV, EV> graph) { - DataSet<Tuple1<K>> edgeIds = graph.getEdges().flatMap(new MapEdgeIds<K, EV>()).distinct(); - DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0).equalTo(0) - .with(new GroupInvalidIds<K, VV>()).first(1); - - return GraphUtils.count(invalidIds.map(new KToTupleMap<K>()), graph.getContext()) - .map(new InvalidIdsMap()); - } - - private static final class MapEdgeIds<K extends Comparable<K> & Serializable, - EV extends Serializable> implements FlatMapFunction<Edge<K, EV>, - Tuple1<K>> { - - @Override - public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) { - out.collect(new Tuple1<K>(edge.f0)); - out.collect(new Tuple1<K>(edge.f1)); - } - } +public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> + extends GraphValidator<K, VV, EV> { - private static final class GroupInvalidIds<K extends Comparable<K> & Serializable, - VV extends Serializable> implements CoGroupFunction<Vertex<K, VV>, Tuple1<K>, K> { + /** + * Checks that the edge set input contains valid vertex Ids, i.e. that they + * also exist in the vertex input set. + * + * @return a singleton DataSet<Boolean> stating whether a graph is valid + * with respect to its vertex ids. + */ + @Override + public DataSet<Boolean> validate(Graph<K, VV, EV> graph) { + DataSet<Tuple1<K>> edgeIds = graph.getEdges() + .flatMap(new MapEdgeIds<K, EV>()).distinct(); + DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0) + .equalTo(0).with(new GroupInvalidIds<K, VV>()).first(1); - @Override - public void coGroup(Iterable<Vertex<K, VV>> vertexId, - Iterable<Tuple1<K>> edgeId, Collector<K> out) { - if (!(vertexId.iterator().hasNext())) { - // found an id that doesn't exist in the vertex set - out.collect(edgeId.iterator().next().f0); - } - } - } + return GraphUtils.count(invalidIds.map(new KToTupleMap<K>()), + graph.getContext()).map(new InvalidIdsMap()); + } - private static final class KToTupleMap<K> implements MapFunction<K, Tuple1<K>> { + private static final class MapEdgeIds<K extends Comparable<K> & Serializable, EV extends Serializable> + implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> { + public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) { + out.collect(new Tuple1<K>(edge.f0)); + out.collect(new Tuple1<K>(edge.f1)); + } + } - @Override - public Tuple1<K> map (K key)throws Exception { - return new Tuple1<>(key); - } - } + private static final class GroupInvalidIds<K extends Comparable<K> & Serializable, VV extends Serializable> + implements CoGroupFunction<Vertex<K, VV>, Tuple1<K>, K> { + public void coGroup(Iterable<Vertex<K, VV>> vertexId, + Iterable<Tuple1<K>> edgeId, Collector<K> out) { + if (!(vertexId.iterator().hasNext())) { + // found an id that doesn't exist in the vertex set + out.collect(edgeId.iterator().next().f0); + } + } + } - private static final class InvalidIdsMap implements MapFunction<Integer, Boolean> { + private static final class KToTupleMap<K> implements MapFunction<K, Tuple1<K>> { + public Tuple1<K> map(K key) throws Exception { + return new Tuple1<K>(key); + } + } - @Override - public Boolean map (Integer numberOfInvalidIds)throws Exception { - return numberOfInvalidIds == 0; - } - } + private static final class InvalidIdsMap implements MapFunction<Integer, Boolean> { + public Boolean map(Integer numberOfInvalidIds) throws Exception { + return numberOfInvalidIds == 0; + } + } } \ No newline at end of file