[FLINK-1201] [gelly] added label propagation step in MusicProfiles
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/32d9d2b1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/32d9d2b1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/32d9d2b1 Branch: refs/heads/master Commit: 32d9d2b1342df40051257dc64f534b0a7da340ee Parents: 0393bc1 Author: vasia <vasilikikala...@gmail.com> Authored: Mon Jan 5 20:58:14 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:15 2015 +0100 ---------------------------------------------------------------------- .../flink/graph/example/MusicProfiles.java | 66 +++++++++++++++++--- .../graph/example/utils/MusicProfilesData.java | 65 +++++++++++++++++++ 2 files changed, 122 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/32d9d2b1/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java index d74e339..02f6554 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java @@ -2,9 +2,11 @@ package flink.graphs.example; import java.util.ArrayList; import java.util.List; +import java.util.Random; import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; @@ -20,26 +22,41 @@ import flink.graphs.EdgeDirection; import flink.graphs.EdgesFunctionWithVertexValue; import flink.graphs.Graph; import flink.graphs.Vertex; +import flink.graphs.example.utils.MusicProfilesData; +import flink.graphs.library.LabelPropagation; import flink.graphs.utils.Tuple3ToEdgeMap; public class MusicProfiles implements ProgramDescription { + /** + * This example demonstrates how to mix the "record" Flink API with the graph API. + * The input is a set <userId - songId - playCount> triplets and a set of + * bad records,i.e. song ids that should not be trusted. + * Initially, we use the record API to filter out the bad records. + * Then, we use the graph API to create a user -> song weighted bipartite graph + * and compute the top song (most listened) per user. + * Then, we use the record API again, to create a user-user similarity graph, + * based on common songs, where two users that listen to the same song are connected. + * Finally, we use the graph API to run the label propagation community detection algorithm + * on the similarity graph. + */ public static void main (String [] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + final long numberOfLabels = 3; + final int numIterations = 10; /** * Read the user-song-play triplets * The format is <userID>\t<songID>\t<playcount> */ - DataSet<Tuple3<String, String, Integer>> triplets = env.readCsvFile(args[0]) - .lineDelimiter("\n").fieldDelimiter('\t').types(String.class, String.class, Integer.class); - + DataSet<Tuple3<String, String, Integer>> triplets = MusicProfilesData.getUserSongTriplets(env); + /** * Read the mismatches dataset and extract the songIDs * The format is "ERROR: <songID trackID> song_title" */ - DataSet<Tuple1<String>> mismatches = env.readTextFile(args[1]).map(new ExtractMismatchSongIds()); + DataSet<Tuple1<String>> mismatches = MusicProfilesData.getMismatches(env).map(new ExtractMismatchSongIds()); /** * Filter out the mismatches from the triplets dataset @@ -58,7 +75,8 @@ public class MusicProfiles implements ProgramDescription { * Get the top track (most listened) for each user */ DataSet<Tuple2<String, String>> usersWithTopTrack = userSongGraph.reduceOnEdges(new GetTopSongPerUser(), - EdgeDirection.OUT); + EdgeDirection.OUT).filter(new FilterSongNodes()); + usersWithTopTrack.print(); /** * Create a user-user similarity graph, based on common songs, @@ -69,6 +87,15 @@ public class MusicProfiles implements ProgramDescription { .reduceGroup(new CreateSimilarUserEdges()).distinct(); Graph<String, NullValue, NullValue> similarUsersGraph = Graph.create(similarUsers, env).getUndirected(); + /** + * Detect user communities using the label propagation library method + */ + DataSet<Vertex<String, Long>> verticesWithCommunity = similarUsersGraph.mapVertices( + new InitVertexLabels(numberOfLabels)) + .run(new LabelPropagation<String>(numIterations)).getVertices(); + verticesWithCommunity.print(); + + env.execute(); } @SuppressWarnings("serial") @@ -89,12 +116,21 @@ public class MusicProfiles implements ProgramDescription { Collector<Tuple3<String, String, Integer>> out) { if (!invalidSongs.iterator().hasNext()) { // this is a valid triplet - out.collect(triplets.iterator().next()); + for (Tuple3<String, String, Integer> triplet : triplets) { + out.collect(triplet); + } } } } @SuppressWarnings("serial") + public static final class FilterSongNodes implements FilterFunction<Tuple2<String, String>> { + public boolean filter(Tuple2<String, String> value) throws Exception { + return !value.f1.equals(""); + } + } + + @SuppressWarnings("serial") public static final class GetTopSongPerUser implements EdgesFunctionWithVertexValue <String, NullValue, Integer, Tuple2<String, String>> { public Tuple2<String, String> iterateEdges(Vertex<String, NullValue> vertex, @@ -120,14 +156,26 @@ public class MusicProfiles implements ProgramDescription { listeners.add(edge.getSource()); } for (int i=0; i < listeners.size()-1; i++) { - out.collect(new Edge<String, NullValue>(listeners.get(i), listeners.get(i+1))); + out.collect(new Edge<String, NullValue>(listeners.get(i), listeners.get(i+1), + NullValue.getInstance())); } } } + + @SuppressWarnings("serial") + public static final class InitVertexLabels implements MapFunction<Vertex<String, NullValue>, Long> { + private long numberOfLabels; + public InitVertexLabels(long labels) { + this.numberOfLabels = labels; + } + public Long map(Vertex<String, NullValue> value) { + Random randomGenerator = new Random(); + return (long) randomGenerator.nextInt((int) numberOfLabels); + } + } @Override public String getDescription() { - return null; + return "Music Profiles Example"; } - } http://git-wip-us.apache.org/repos/asf/flink/blob/32d9d2b1/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java new file mode 100644 index 0000000..cfe9c88 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java @@ -0,0 +1,65 @@ +package flink.graphs.example.utils; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; + +public class MusicProfilesData { + + public static DataSet<Tuple3<String, String, Integer>> getUserSongTriplets(ExecutionEnvironment env) { + List<Tuple3<String, String, Integer>> triplets = new ArrayList<Tuple3<String, String, Integer>>(); + + triplets.add(new Tuple3<String, String, Integer>("user_1", "song_1", 100)); + triplets.add(new Tuple3<String, String, Integer>("user_1", "song_2", 10)); + triplets.add(new Tuple3<String, String, Integer>("user_1", "song_3", 20)); + triplets.add(new Tuple3<String, String, Integer>("user_1", "song_4", 30)); + triplets.add(new Tuple3<String, String, Integer>("user_1", "song_5", 1)); + + triplets.add(new Tuple3<String, String, Integer>("user_2", "song_6", 40)); + triplets.add(new Tuple3<String, String, Integer>("user_2", "song_7", 10)); + triplets.add(new Tuple3<String, String, Integer>("user_2", "song_8", 3)); + + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_1", 100)); + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_2", 10)); + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_3", 20)); + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_8", 30)); + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_9", 1)); + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_10", 8)); + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_11", 90)); + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_12", 30)); + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_13", 34)); + triplets.add(new Tuple3<String, String, Integer>("user_3", "song_14", 17)); + + triplets.add(new Tuple3<String, String, Integer>("user_4", "song_1", 100)); + triplets.add(new Tuple3<String, String, Integer>("user_4", "song_6", 10)); + triplets.add(new Tuple3<String, String, Integer>("user_4", "song_8", 20)); + triplets.add(new Tuple3<String, String, Integer>("user_4", "song_12", 30)); + triplets.add(new Tuple3<String, String, Integer>("user_4", "song_13", 1)); + triplets.add(new Tuple3<String, String, Integer>("user_4", "song_15", 1)); + + triplets.add(new Tuple3<String, String, Integer>("user_5", "song_3", 300)); + triplets.add(new Tuple3<String, String, Integer>("user_5", "song_4", 4)); + triplets.add(new Tuple3<String, String, Integer>("user_5", "song_5", 5)); + triplets.add(new Tuple3<String, String, Integer>("user_5", "song_8", 8)); + triplets.add(new Tuple3<String, String, Integer>("user_5", "song_9", 9)); + triplets.add(new Tuple3<String, String, Integer>("user_5", "song_10", 10)); + triplets.add(new Tuple3<String, String, Integer>("user_5", "song_12", 12)); + triplets.add(new Tuple3<String, String, Integer>("user_5", "song_13", 13)); + triplets.add(new Tuple3<String, String, Integer>("user_5", "song_15", 15)); + + triplets.add(new Tuple3<String, String, Integer>("user_6", "song_6", 30)); + + return env.fromCollection(triplets); + } + + public static DataSet<String> getMismatches(ExecutionEnvironment env) { + List<String> errors = new ArrayList<String>(); + errors.add("ERROR: <song_8 track_8> Sever"); + errors.add("ERROR: <song_15 track_15> Black Trees"); + return env.fromCollection(errors); + } +} +