[FLINK-1201] [gelly] use fromTupleDataset method instead of Tuple3ToEdgeMap
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09d39ce6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09d39ce6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09d39ce6 Branch: refs/heads/master Commit: 09d39ce6538171a8935a47343e231401cef0c845 Parents: f99691c Author: vasia <vasilikikala...@gmail.com> Authored: Mon Jan 19 18:20:19 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 11 10:46:15 2015 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Graph.java | 419 ++++++++++--------- .../flink/graph/example/MusicProfiles.java | 5 +- 2 files changed, 231 insertions(+), 193 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/09d39ce6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 3caf13d..a236478 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -38,7 +38,6 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst; -import org.apache.flink.api.java.io.CsvReader; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; @@ -53,6 +52,8 @@ import flink.graphs.spargel.MessagingFunction; import flink.graphs.spargel.VertexCentricIteration; import flink.graphs.spargel.VertexUpdateFunction; import flink.graphs.utils.GraphUtils; +import flink.graphs.utils.Tuple2ToVertexMap; +import flink.graphs.utils.Tuple3ToEdgeMap; import flink.graphs.validation.GraphValidator; /** @@ -74,19 +75,211 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab private final DataSet<Edge<K, EV>> edges; /** - * Creates a graph from two datasets: vertices and edges and allow setting the undirected property + * Creates a graph from two DataSets: vertices and edges and allow setting the undirected property * * @param vertices a DataSet of vertices. * @param edges a DataSet of vertices. * @param context the flink execution environment. */ - public Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) { + private Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) { this.vertices = vertices; this.edges = edges; this.context = context; } /** + * Creates a graph from a Collection of vertices and a Collection of edges. + * @param vertices a Collection of vertices. + * @param edges a Collection of vertices. + * @param context the flink execution environment. + * @return the newly created graph. + */ + public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> + Graph<K, VV, EV> fromCollection (Collection<Vertex<K,VV>> vertices, + Collection<Edge<K,EV>> edges, + ExecutionEnvironment context) { + + return fromDataSet(context.fromCollection(vertices), context.fromCollection(edges), context); + } + + /** + * Creates a graph from a Collection of edges, vertices are induced from the edges. + * Vertices are created automatically and their values are set to NullValue. + * @param edges a Collection of vertices. + * @param context the flink execution environment. + * @return the newly created graph. + */ + public static <K extends Comparable<K> & Serializable, EV extends Serializable> + Graph<K, NullValue, EV> fromCollection (Collection<Edge<K,EV>> edges, ExecutionEnvironment context) { + + return fromDataSet(context.fromCollection(edges), context); + } + + /** + * Creates a graph from a Collection of edges, vertices are induced from the edges and + * vertex values are calculated by a mapper function. + * Vertices are created automatically and their values are set + * by applying the provided map function to the vertex ids. + * @param edges a Collection of vertices. + * @param mapper the mapper function. + * @param context the flink execution environment. + * @return the newly created graph. + */ + public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> + Graph<K, VV, EV> fromCollection (Collection<Edge<K,EV>> edges, + final MapFunction<K, VV> mapper, + ExecutionEnvironment context) { + + return fromDataSet(context.fromCollection(edges), mapper, context); + } + + /** + * Creates a graph from a DataSet of vertices and a DataSet of edges. + * @param vertices a DataSet of vertices. + * @param edges a DataSet of vertices. + * @param context the flink execution environment. + * @return the newly created graph. + */ + public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> + Graph<K, VV, EV> fromDataSet (DataSet<Vertex<K,VV>> vertices, + DataSet<Edge<K,EV>> edges, + ExecutionEnvironment context) { + + return new Graph<K, VV, EV>(vertices, edges, context); + } + + /** + * Creates a graph from a DataSet of edges, vertices are induced from the edges. + * Vertices are created automatically and their values are set to NullValue. + * @param edges a DataSet of vertices. + * @param context the flink execution environment. + * @return the newly created graph. + */ + public static <K extends Comparable<K> & Serializable, EV extends Serializable> + Graph<K, NullValue, EV> fromDataSet (DataSet<Edge<K,EV>> edges, + ExecutionEnvironment context) { + + DataSet<Vertex<K, NullValue>> vertices = + edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct(); + + return new Graph<K, NullValue, EV>(vertices, edges, context); + } + + private static final class EmitSrcAndTarget<K extends Comparable<K> & Serializable, EV extends Serializable> + implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> { + + public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, NullValue>> out) { + out.collect(new Vertex<K, NullValue>(edge.f0, NullValue.getInstance())); + out.collect(new Vertex<K, NullValue>(edge.f1, NullValue.getInstance())); + } + } + + /** + * Creates a graph from a DataSet of edges, vertices are induced from the edges and + * vertex values are calculated by a mapper function. + * Vertices are created automatically and their values are set + * by applying the provided map function to the vertex ids. + * @param edges a DataSet of vertices. + * @param mapper the mapper function. + * @param context the flink execution environment. + * @return the newly created graph. + */ + public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> + Graph<K, VV, EV> fromDataSet (DataSet<Edge<K,EV>> edges, + final MapFunction<K, VV> mapper, + ExecutionEnvironment context) { + + TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0); + + TypeInformation<VV> valueType = TypeExtractor + .createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null); + + @SuppressWarnings({ "unchecked", "rawtypes" }) + TypeInformation<Vertex<K, VV>> returnType = (TypeInformation<Vertex<K, VV>>) + new TupleTypeInfo(Vertex.class, keyType, valueType); + + DataSet<Vertex<K, VV>> vertices = + edges.flatMap(new EmitSrcAndTargetAsTuple1<K, EV>()) + .distinct() + .map(new MapFunction<Tuple1<K>, Vertex<K, VV>>() { + public Vertex<K, VV> map(Tuple1<K> value) throws Exception { + return new Vertex<K, VV>(value.f0, mapper.map(value.f0)); + } + }) + .returns(returnType); + + return new Graph<K, VV, EV>(vertices, edges, context); + } + + private static final class EmitSrcAndTargetAsTuple1<K extends Comparable<K> & Serializable, + EV extends Serializable> implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> { + + public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) { + out.collect(new Tuple1<K>(edge.f0)); + out.collect(new Tuple1<K>(edge.f1)); + } + } + + /** + * Creates a graph from a DataSet of Tuple objects for vertices and edges. + * + * Vertices with value are created from Tuple2, + * Edges with value are created from Tuple3. + * + * @param vertices a DataSet of vertices. + * @param edges a DataSet of vertices. + * @param context the flink execution environment. + * @return the newly created graph. + */ + public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> + Graph<K, VV, EV> fromTupleDataSet (DataSet<Tuple2<K, VV>> vertices, + DataSet<Tuple3<K, K, EV>> edges, + ExecutionEnvironment context) { + + DataSet<Vertex<K, VV>> vertexDataSet = vertices.map(new Tuple2ToVertexMap<K, VV>()); + DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>()); + return fromDataSet(vertexDataSet, edgeDataSet, context); + } + + /** + * Creates a graph from a DataSet of Tuple objects for edges, vertices are induced from the edges. + * + * Edges with value are created from Tuple3. + * Vertices are created automatically and their values are set to NullValue. + * + * @param edges a DataSet of vertices. + * @param context the flink execution environment. + * @return the newly created graph. + */ + public static <K extends Comparable<K> & Serializable, EV extends Serializable> + Graph<K, NullValue, EV> fromTupleDataSet (DataSet<Tuple3<K, K, EV>> edges, + ExecutionEnvironment context) { + + DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>()); + return fromDataSet(edgeDataSet, context); + } + + /** + * Creates a graph from a DataSet of Tuple objects for edges, vertices are induced from the edges and + * vertex values are calculated by a mapper function. + * Edges with value are created from Tuple3. + * Vertices are created automatically and their values are set + * by applying the provided map function to the vertex ids. + * @param edges a DataSet of vertices. + * @param mapper the mapper function. + * @param context the flink execution environment. + * @return the newly created graph. + */ + public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> + Graph<K, VV, EV> fromTupleDataSet (DataSet<Tuple3<K, K, EV>> edges, + final MapFunction<K, VV> mapper, + ExecutionEnvironment context) { + + DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>()); + return fromDataSet(edgeDataSet, mapper, context); + } + + /** * @return the flink execution environment. */ public ExecutionEnvironment getContext() { @@ -102,19 +295,35 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * @return the vertex dataset. + * @return the vertex DataSet. */ public DataSet<Vertex<K, VV>> getVertices() { return vertices; } /** - * @return the edge dataset. + * @return the edge DataSet. */ public DataSet<Edge<K, EV>> getEdges() { return edges; } + /** + * @return the vertex DataSet as Tuple2. + */ + @SuppressWarnings({ "unchecked" }) + public DataSet<Tuple2<K, VV>> getVerticesAsTuple2() { + return (DataSet<Tuple2<K, VV>>) (DataSet<?>) vertices; + } + + /** + * @return the edge DataSet as Tuple3. + */ + @SuppressWarnings({ "unchecked" }) + public DataSet<Tuple3<K, K, EV>> getEdgesAsTuple3() { + return (DataSet<Tuple3<K, K, EV>>) (DataSet<?>) edges; + } + /** * Apply a function to the attribute of each vertex in the graph. * @param mapper the map function to apply. @@ -139,7 +348,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab }) .returns(returnType); - return new Graph<K, NV, EV>(mappedVertices, this.getEdges(), this.context); + return new Graph<K, NV, EV>(mappedVertices, this.edges, this.context); } /** @@ -180,7 +389,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab DataSet<Vertex<K, VV>> resultedVertices = this.getVertices() .coGroup(inputDataSet).where(0).equalTo(0) .with(new ApplyCoGroupToVertexValues<K, VV, T>(mapper)); - return Graph.create(resultedVertices, this.getEdges(), this.getContext()); + return new Graph<K, VV, EV>(resultedVertices, this.edges, this.context); } private static final class ApplyCoGroupToVertexValues<K extends Comparable<K> & Serializable, @@ -226,7 +435,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab DataSet<Edge<K, EV>> resultedEdges = this.getEdges() .coGroup(inputDataSet).where(0,1).equalTo(0,1) .with(new ApplyCoGroupToEdgeValues<K, EV, T>(mapper)); - return Graph.create(this.getVertices(), resultedEdges, this.getContext()); + return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context); } private static final class ApplyCoGroupToEdgeValues<K extends Comparable<K> & Serializable, @@ -275,7 +484,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab .coGroup(inputDataSet).where(0).equalTo(0) .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper)); - return Graph.create(this.getVertices(), resultedEdges, this.getContext()); + return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context); } private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K extends Comparable<K> & Serializable, @@ -327,7 +536,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab .coGroup(inputDataSet).where(1).equalTo(0) .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper)); - return Graph.create(this.getVertices(), resultedEdges, this.getContext()); + return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context); } /** @@ -650,120 +859,6 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } /** - * Creates a graph from a dataset of vertices and a dataset of edges - * @param vertices a DataSet of vertices. - * @param edges a DataSet of vertices. - * @param context the flink execution environment. - * @return the newly created graph - */ - public static <K extends Comparable<K> & Serializable, VV extends Serializable, - EV extends Serializable> Graph<K, VV, EV> - create(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, - ExecutionEnvironment context) { - return new Graph<K, VV, EV>(vertices, edges, context); - } - - /** - * Creates a graph from a DataSet of edges. - * Vertices are created automatically and their values are set to NullValue. - * @param edges a DataSet of vertices. - * @param context the flink execution environment. - * @return the newly created graph - */ - public static <K extends Comparable<K> & Serializable, EV extends Serializable> - Graph<K, NullValue, EV> create(DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) { - DataSet<Vertex<K, NullValue>> vertices = - edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct(); - return new Graph<K, NullValue, EV>(vertices, edges, context); - } - - /** - * Creates a graph from a DataSet of edges. - * Vertices are created automatically and their values are set - * by applying the provided map function to the vertex ids. - * @param edges the input edges - * @param mapper the map function to set the initial vertex value - * @return the newly created graph - */ - public static <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable> - Graph<K, VV, EV> create(DataSet<Edge<K, EV>> edges, final MapFunction<K, VV> mapper, - ExecutionEnvironment context) { - TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0); - - TypeInformation<VV> valueType = TypeExtractor - .createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null); - - @SuppressWarnings({ "unchecked", "rawtypes" }) - TypeInformation<Vertex<K, VV>> returnType = (TypeInformation<Vertex<K, VV>>) - new TupleTypeInfo(Vertex.class, keyType, valueType); - - DataSet<Vertex<K, VV>> vertices = - edges.flatMap(new EmitSrcAndTargetAsTuple1<K, EV>()) - .distinct().map(new MapFunction<Tuple1<K>, Vertex<K, VV>>(){ - public Vertex<K, VV> map(Tuple1<K> value) throws Exception { - return new Vertex<K, VV>(value.f0, mapper.map(value.f0)); - } - }).returns(returnType); - return new Graph<K, VV, EV>(vertices, edges, context); - } - - private static final class EmitSrcAndTarget<K extends Comparable<K> & Serializable, EV extends Serializable> - implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> { - public void flatMap(Edge<K, EV> edge, - Collector<Vertex<K, NullValue>> out) { - - out.collect(new Vertex<K, NullValue>(edge.f0, NullValue.getInstance())); - out.collect(new Vertex<K, NullValue>(edge.f1, NullValue.getInstance())); - } - } - - private static final class EmitSrcAndTargetAsTuple1<K extends Comparable<K> & Serializable, - EV extends Serializable> implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> { - public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) { - - out.collect(new Tuple1<K>(edge.f0)); - out.collect(new Tuple1<K>(edge.f1)); - } - } - - /** - * Read and create the graph vertex Tuple2 DataSet from a csv file - * - * The CSV file should be of the following format: - * - * <vertexID><delimiter><vertexValue> - * - * For example, with space delimiter: - * - * 1 57 - * 2 45 - * 3 77 - * 4 12 - * - * @param context the flink execution environment. - * @param filePath the path to the CSV file. - * @param delimiter the CSV delimiter. - * @param Tuple2IdClass The class to use for Vertex IDs - * @param Tuple2ValueClass The class to use for Vertex Values - * @return a set of vertices and their values. - */ - public static <K extends Comparable<K> & Serializable, VV extends Serializable> - DataSet<Tuple2<K, VV>> - readTuple2CsvFile(ExecutionEnvironment context, String filePath, - char delimiter, Class<K> Tuple2IdClass, Class<VV> Tuple2ValueClass) { - - CsvReader reader = new CsvReader(filePath, context); - DataSet<Tuple2<K, VV>> vertices = reader.fieldDelimiter(delimiter).types(Tuple2IdClass, Tuple2ValueClass) - .map(new MapFunction<Tuple2<K, VV>, Tuple2<K, VV>>() { - - public Tuple2<K, VV> map(Tuple2<K, VV> value) throws Exception { - return (Tuple2<K, VV>)value; - } - }); - return vertices; - } - - /** * @return Singleton DataSet containing the vertex count */ public DataSet<Integer> numberOfVertices () { @@ -880,14 +975,6 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab } } - public Graph<K, VV, EV> fromCollection (Collection<Vertex<K,VV>> vertices, Collection<Edge<K,EV>> edges) { - - DataSet<Vertex<K, VV>> v = context.fromCollection(vertices); - DataSet<Edge<K, EV>> e = context.fromCollection(edges); - - return new Graph<K, VV, EV>(v, e, context); - } - /** * Adds the input vertex and edges to the graph. * If the vertex already exists in the graph, it will not be added again, @@ -902,14 +989,14 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab // Take care of empty edge set if (edges.isEmpty()) { - return Graph.create(getVertices().union(newVertex).distinct(), getEdges(), context); + return new Graph<K, VV, EV>(this.vertices.union(newVertex).distinct(), this.edges, this.context); } // Add the vertex and its edges - DataSet<Vertex<K, VV>> newVertices = getVertices().union(newVertex).distinct(); - DataSet<Edge<K, EV>> newEdges = getEdges().union(context.fromCollection(edges)); + DataSet<Vertex<K, VV>> newVertices = this.vertices.union(newVertex).distinct(); + DataSet<Edge<K, EV>> newEdges = this.edges.union(context.fromCollection(edges)); - return Graph.create(newVertices, newEdges, context); + return new Graph<K, VV, EV>(newVertices, newEdges, this.context); } /** @@ -922,8 +1009,11 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab * @return the new graph containing the existing vertices and edges plus the newly added edge */ public Graph<K, VV, EV> addEdge (Vertex<K,VV> source, Vertex<K,VV> target, EV edgeValue) { - Graph<K,VV,EV> partialGraph = this.fromCollection(Arrays.asList(source, target), - Arrays.asList(new Edge<K, EV>(source.f0, target.f0, edgeValue))); + Graph<K,VV,EV> partialGraph = fromCollection( + Arrays.asList(source, target), + Arrays.asList(new Edge<K, EV>(source.f0, target.f0, edgeValue)), + this.context + ); return this.union(partialGraph); } @@ -986,7 +1076,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab public Graph<K, VV, EV> removeEdge (Edge<K, EV> edge) { DataSet<Edge<K, EV>> newEdges = getEdges().filter( new EdgeRemovalEdgeFilter<K, EV>(edge)); - return new Graph<K, VV, EV>(this.getVertices(), newEdges, this.context); + return new Graph<K, VV, EV>(this.vertices, newEdges, this.context); } private static final class EdgeRemovalEdgeFilter<K extends Comparable<K> & Serializable, @@ -1028,58 +1118,9 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab DataSet<Vertex<K, VV>> newVertices = vertices.runOperation( VertexCentricIteration.withEdges(edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations)); - return new Graph<K, VV, EV>(newVertices, edges, context); + return new Graph<K, VV, EV>(newVertices, this.edges, this.context); } - /** - * Creates a graph from the given vertex and edge collections - * @param context the flink execution environment. - * @param v the collection of vertices - * @param e the collection of edges - * @return a new graph formed from the set of edges and vertices - */ - public static <K extends Comparable<K> & Serializable, VV extends Serializable, - EV extends Serializable> Graph<K, VV, EV> - fromCollection(ExecutionEnvironment context, Collection<Vertex<K, VV>> v, - Collection<Edge<K, EV>> e) throws Exception { - - DataSet<Vertex<K, VV>> vertices = context.fromCollection(v); - DataSet<Edge<K, EV>> edges = context.fromCollection(e); - - return Graph.create(vertices, edges, context); - } - - /** - * Vertices may not have a value attached or may receive a value as a result of running the algorithm. - * @param context the flink execution environment. - * @param e the collection of edges - * @return a new graph formed from the edges, with no value for the vertices - */ - public static <K extends Comparable<K> & Serializable, VV extends Serializable, - EV extends Serializable> Graph<K, NullValue, EV> - fromCollection(ExecutionEnvironment context, Collection<Edge<K, EV>> e) { - - DataSet<Edge<K, EV>> edges = context.fromCollection(e); - - return Graph.create(edges, context); - } - - /** - * Vertices may have an initial value defined by a function. - * @param context the flink execution environment. - * @param e the collection of edges - * @return a new graph formed from the edges, with a custom value for the vertices, - * determined by the mapping function - */ - public static <K extends Comparable<K> & Serializable, VV extends Serializable, - EV extends Serializable> Graph<K, VV, EV> - fromCollection(ExecutionEnvironment context, Collection<Edge<K, EV>> e, - final MapFunction<K, VV> mapper) { - - DataSet<Edge<K, EV>> edges = context.fromCollection(e); - return Graph.create(edges, mapper, context); - } - public Graph<K, VV, EV> run (GraphAlgorithm<K, VV, EV> algorithm) { return algorithm.run(this); } http://git-wip-us.apache.org/repos/asf/flink/blob/09d39ce6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java index 44f226f..668c765 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java @@ -23,7 +23,6 @@ import flink.graphs.Graph; import flink.graphs.Vertex; import flink.graphs.example.utils.MusicProfilesData; import flink.graphs.library.LabelPropagation; -import flink.graphs.utils.Tuple3ToEdgeMap; @SuppressWarnings("serial") public class MusicProfiles implements ProgramDescription { @@ -67,9 +66,7 @@ public class MusicProfiles implements ProgramDescription { * Create a user -> song weighted bipartite graph * where the edge weights correspond to play counts */ - DataSet<Edge<String, Integer>> userSongEdges = validTriplets.map(new Tuple3ToEdgeMap<String, Integer>()); - - Graph<String, NullValue, Integer> userSongGraph = Graph.fromDataSet(userSongEdges, env); + Graph<String, NullValue, Integer> userSongGraph = Graph.fromTupleDataSet(validTriplets, env); /** * Get the top track (most listened) for each user