[FLINK-1201] [gelly] filtering bad records and top track per user
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3cef2105 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3cef2105 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3cef2105 Branch: refs/heads/master Commit: 3cef2105ae4a1c6f2c2780539ad9fc7d76747e9d Parents: 2b98beb Author: vasia <vasilikikala...@gmail.com> Authored: Sun Dec 14 22:44:24 2014 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:15 2015 +0100 ---------------------------------------------------------------------- .../flink/graph/example/MusicProfiles.java | 102 +++++++++++++++++++ .../flink/graph/utils/Tuple3ToEdgeMap.java | 25 +++++ 2 files changed, 127 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3cef2105/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java new file mode 100644 index 0000000..cd425a3 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java @@ -0,0 +1,102 @@ +package flink.graphs.example; + +import java.util.Iterator; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; + +import flink.graphs.Edge; +import flink.graphs.EdgeDirection; +import flink.graphs.EdgesFunction; +import flink.graphs.Graph; +import flink.graphs.utils.Tuple3ToEdgeMap; + +public class MusicProfiles implements ProgramDescription { + + @SuppressWarnings("serial") + public static void main (String [] args) throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + /** read the user-song-play triplets + * The format is <userID>\t<songID>\t<playcount> + */ + DataSet<Tuple3<String, String, Integer>> triplets = env.readCsvFile(args[0]) + .lineDelimiter("\n").fieldDelimiter('\t').types(String.class, String.class, Integer.class); + + /** + * read the mismatches dataset and extract the songIDs + * The format is "ERROR: <songID trackID> song_title" + */ + DataSet<Tuple1<String>> mismatches = env.readTextFile(args[1]).map( + new MapFunction<String, Tuple1<String>>() { + public Tuple1<String> map(String value) { + // TODO Auto-generated method stub + return null; + } + }); + + // filter out the mismatches from the triplets dataset + DataSet<Tuple3<String, String, Integer>> validTriplets = triplets.coGroup(mismatches) + .where(1).equalTo(0).with(new CoGroupFunction<Tuple3<String, String, Integer>, Tuple1<String>, + Tuple3<String, String, Integer>>() { + public void coGroup( + Iterable<Tuple3<String, String, Integer>> triplets, + Iterable<Tuple1<String>> invalidSongs, + Collector<Tuple3<String, String, Integer>> out) { + if (!invalidSongs.iterator().hasNext()) { + // this is a valid triplet + out.collect(triplets.iterator().next()); + } + } + }); + + // Create a user -> song weighted bipartite graph + // where the edge weights correspond to play counts + DataSet<Edge<String, Integer>> userSongEdges = validTriplets.map( + new Tuple3ToEdgeMap<String, Integer>()); + + Graph<String, NullValue, Integer> userSongGraph = Graph.create(userSongEdges, env); + + // get the top track (most listened) for each user + DataSet<Tuple2<String, String>> usersWithTopTrack = userSongGraph.reduceOnEdges( + new EdgesFunction<String, Integer, String>() { + public Tuple2<String, String> iterateEdges( + Iterable<Tuple2<String, Edge<String, Integer>>> edges) { + int maxPlaycount = 0; + String userId = ""; + String topSong = ""; + + final Iterator<Tuple2<String, Edge<String, Integer>>> edgesIterator = + edges.iterator(); + if (edgesIterator.hasNext()) { + Tuple2<String, Edge<String, Integer>> first = edgesIterator.next(); + userId = first.f0; + topSong = first.f1.getTarget(); + } + while (edgesIterator.hasNext()) { + Tuple2<String, Edge<String, Integer>> edge = edgesIterator.next(); + if (edge.f1.getValue() > maxPlaycount) { + maxPlaycount = edge.f1.getValue(); + topSong = edge.f1.getTarget(); + } + } + return new Tuple2<String, String> (userId, topSong); + } + }, EdgeDirection.OUT); + } + + @Override + public String getDescription() { + return null; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/3cef2105/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java new file mode 100644 index 0000000..2a6cb23 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java @@ -0,0 +1,25 @@ +package flink.graphs.utils; + +import java.io.Serializable; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple3; + +import flink.graphs.Edge; + +/** + * create an Edge DataSetfrom a Tuple3 dataset + * + * @param <K> + * @param <EV> + */ +public class Tuple3ToEdgeMap<K extends Comparable<K> & Serializable, + EV extends Serializable> implements MapFunction<Tuple3<K, K, EV>, Edge<K, EV>> { + + private static final long serialVersionUID = 1L; + + public Edge<K, EV> map(Tuple3<K, K, EV> tuple) { + return new Edge<K, EV>(tuple.f0, tuple.f1, tuple.f2); + } + +}