[FLINK-1201] [gelly] use fromTupleDataset method instead of Tuple3ToEdgeMap


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09d39ce6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09d39ce6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09d39ce6

Branch: refs/heads/master
Commit: 09d39ce6538171a8935a47343e231401cef0c845
Parents: f99691c
Author: vasia <vasilikikala...@gmail.com>
Authored: Mon Jan 19 18:20:19 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 10:46:15 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 419 ++++++++++---------
 .../flink/graph/example/MusicProfiles.java      |   5 +-
 2 files changed, 231 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09d39ce6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 3caf13d..a236478 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -38,7 +38,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
 import 
org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFieldsFirst;
-import org.apache.flink.api.java.io.CsvReader;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -53,6 +52,8 @@ import flink.graphs.spargel.MessagingFunction;
 import flink.graphs.spargel.VertexCentricIteration;
 import flink.graphs.spargel.VertexUpdateFunction;
 import flink.graphs.utils.GraphUtils;
+import flink.graphs.utils.Tuple2ToVertexMap;
+import flink.graphs.utils.Tuple3ToEdgeMap;
 import flink.graphs.validation.GraphValidator;
 
 /**
@@ -74,19 +75,211 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
        private final DataSet<Edge<K, EV>> edges;
 
        /**
-        * Creates a graph from two datasets: vertices and edges and allow 
setting the undirected property
+        * Creates a graph from two DataSets: vertices and edges and allow 
setting the undirected property
         *
         * @param vertices a DataSet of vertices.
         * @param edges a DataSet of vertices.
         * @param context the flink execution environment.
         */
-       public Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> 
edges, ExecutionEnvironment context) {
+       private Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> 
edges, ExecutionEnvironment context) {
                this.vertices = vertices;
                this.edges = edges;
         this.context = context;
        }
 
        /**
+        * Creates a graph from a Collection of vertices and a Collection of 
edges.
+        * @param vertices a Collection of vertices.
+        * @param edges a Collection of vertices.
+        * @param context the flink execution environment.
+        * @return the newly created graph.
+        */
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
+               Graph<K, VV, EV> fromCollection (Collection<Vertex<K,VV>> 
vertices,
+                                                                               
 Collection<Edge<K,EV>> edges,
+                                                                               
 ExecutionEnvironment context) {
+
+               return fromDataSet(context.fromCollection(vertices), 
context.fromCollection(edges), context);
+       }
+
+       /**
+        * Creates a graph from a Collection of edges, vertices are induced 
from the edges.
+        * Vertices are created automatically and their values are set to 
NullValue.
+        * @param edges a Collection of vertices.
+        * @param context the flink execution environment.
+        * @return the newly created graph.
+        */
+       public static <K extends Comparable<K> & Serializable, EV extends 
Serializable>
+               Graph<K, NullValue, EV> fromCollection (Collection<Edge<K,EV>> 
edges, ExecutionEnvironment context) {
+
+               return fromDataSet(context.fromCollection(edges), context);
+       }
+
+       /**
+        * Creates a graph from a Collection of edges, vertices are induced 
from the edges and
+        * vertex values are calculated by a mapper function.
+        * Vertices are created automatically and their values are set
+        * by applying the provided map function to the vertex ids.
+        * @param edges a Collection of vertices.
+        * @param mapper the mapper function.
+        * @param context the flink execution environment.
+        * @return the newly created graph.
+        */
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
+               Graph<K, VV, EV> fromCollection (Collection<Edge<K,EV>> edges,
+                                                                               
 final MapFunction<K, VV> mapper,
+                                                                               
 ExecutionEnvironment context) {
+
+               return fromDataSet(context.fromCollection(edges), mapper, 
context);
+       }
+
+       /**
+        * Creates a graph from a DataSet of vertices and a DataSet of edges.
+        * @param vertices a DataSet of vertices.
+        * @param edges a DataSet of vertices.
+        * @param context the flink execution environment.
+        * @return the newly created graph.
+        */
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
+               Graph<K, VV, EV> fromDataSet (DataSet<Vertex<K,VV>> vertices,
+                                                                         
DataSet<Edge<K,EV>> edges,
+                                                                         
ExecutionEnvironment context) {
+
+               return new Graph<K, VV, EV>(vertices, edges, context);
+       }
+
+       /**
+        * Creates a graph from a DataSet of edges, vertices are induced from 
the edges.
+        * Vertices are created automatically and their values are set to 
NullValue.
+        * @param edges a DataSet of vertices.
+        * @param context the flink execution environment.
+        * @return the newly created graph.
+        */
+       public static <K extends Comparable<K> & Serializable, EV extends 
Serializable>
+               Graph<K, NullValue, EV> fromDataSet (DataSet<Edge<K,EV>> edges,
+                                                                               
         ExecutionEnvironment context) {
+
+               DataSet<Vertex<K, NullValue>> vertices =
+                               edges.flatMap(new EmitSrcAndTarget<K, 
EV>()).distinct();
+
+               return new Graph<K, NullValue, EV>(vertices, edges, context);
+       }
+
+       private static final class EmitSrcAndTarget<K extends Comparable<K> & 
Serializable, EV extends Serializable>
+                       implements FlatMapFunction<Edge<K, EV>, Vertex<K, 
NullValue>> {
+
+               public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, 
NullValue>> out) {
+                       out.collect(new Vertex<K, NullValue>(edge.f0, 
NullValue.getInstance()));
+                       out.collect(new Vertex<K, NullValue>(edge.f1, 
NullValue.getInstance()));
+               }
+       }
+
+       /**
+        * Creates a graph from a DataSet of edges, vertices are induced from 
the edges and
+        * vertex values are calculated by a mapper function.
+        * Vertices are created automatically and their values are set
+        * by applying the provided map function to the vertex ids.
+        * @param edges a DataSet of vertices.
+        * @param mapper the mapper function.
+        * @param context the flink execution environment.
+        * @return the newly created graph.
+        */
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
+               Graph<K, VV, EV> fromDataSet (DataSet<Edge<K,EV>> edges,
+                                                                         final 
MapFunction<K, VV> mapper,
+                                                                         
ExecutionEnvironment context) {
+
+               TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
edges.getType()).getTypeAt(0);
+
+               TypeInformation<VV> valueType = TypeExtractor
+                               .createTypeInfo(MapFunction.class, 
mapper.getClass(), 1, null, null);
+
+               @SuppressWarnings({ "unchecked", "rawtypes" })
+               TypeInformation<Vertex<K, VV>> returnType = 
(TypeInformation<Vertex<K, VV>>)
+                               new TupleTypeInfo(Vertex.class, keyType, 
valueType);
+
+               DataSet<Vertex<K, VV>> vertices =
+                               edges.flatMap(new EmitSrcAndTargetAsTuple1<K, 
EV>())
+                                    .distinct()
+                                    .map(new MapFunction<Tuple1<K>, Vertex<K, 
VV>>() {
+                                                public Vertex<K, VV> 
map(Tuple1<K> value) throws Exception {
+                                                        return new Vertex<K, 
VV>(value.f0, mapper.map(value.f0));
+                                                }
+                                        })
+                                    .returns(returnType);
+
+               return new Graph<K, VV, EV>(vertices, edges, context);
+       }
+
+       private static final class EmitSrcAndTargetAsTuple1<K extends 
Comparable<K> & Serializable,
+                       EV extends Serializable> implements 
FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
+
+               public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) 
{
+                       out.collect(new Tuple1<K>(edge.f0));
+                       out.collect(new Tuple1<K>(edge.f1));
+               }
+       }
+
+       /**
+        * Creates a graph from a DataSet of Tuple objects for vertices and 
edges.
+        *
+        * Vertices with value are created from Tuple2,
+        * Edges with value are created from Tuple3.
+        *
+        * @param vertices a DataSet of vertices.
+        * @param edges a DataSet of vertices.
+        * @param context the flink execution environment.
+        * @return the newly created graph.
+        */
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
+               Graph<K, VV, EV> fromTupleDataSet (DataSet<Tuple2<K, VV>> 
vertices,
+                                                                               
   DataSet<Tuple3<K, K, EV>> edges,
+                                                                               
   ExecutionEnvironment context) {
+
+               DataSet<Vertex<K, VV>> vertexDataSet = vertices.map(new 
Tuple2ToVertexMap<K, VV>());
+               DataSet<Edge<K, EV>> edgeDataSet = edges.map(new 
Tuple3ToEdgeMap<K, EV>());
+               return fromDataSet(vertexDataSet, edgeDataSet, context);
+       }
+
+       /**
+        * Creates a graph from a DataSet of Tuple objects for edges, vertices 
are induced from the edges.
+        *
+        * Edges with value are created from Tuple3.
+        * Vertices are created automatically and their values are set to 
NullValue.
+        *
+        * @param edges a DataSet of vertices.
+        * @param context the flink execution environment.
+        * @return the newly created graph.
+        */
+       public static <K extends Comparable<K> & Serializable, EV extends 
Serializable>
+               Graph<K, NullValue, EV> fromTupleDataSet (DataSet<Tuple3<K, K, 
EV>> edges,
+                                                                               
                  ExecutionEnvironment context) {
+
+               DataSet<Edge<K, EV>> edgeDataSet = edges.map(new 
Tuple3ToEdgeMap<K, EV>());
+               return fromDataSet(edgeDataSet, context);
+       }
+
+       /**
+        * Creates a graph from a DataSet of Tuple objects for edges, vertices 
are induced from the edges and
+        * vertex values are calculated by a mapper function.
+        * Edges with value are created from Tuple3.
+        * Vertices are created automatically and their values are set
+        * by applying the provided map function to the vertex ids.
+        * @param edges a DataSet of vertices.
+        * @param mapper the mapper function.
+        * @param context the flink execution environment.
+        * @return the newly created graph.
+        */
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
+       Graph<K, VV, EV> fromTupleDataSet (DataSet<Tuple3<K, K, EV>> edges,
+                                                                          
final MapFunction<K, VV> mapper,
+                                                                          
ExecutionEnvironment context) {
+
+               DataSet<Edge<K, EV>> edgeDataSet = edges.map(new 
Tuple3ToEdgeMap<K, EV>());
+               return fromDataSet(edgeDataSet, mapper, context);
+       }
+
+       /**
         * @return the flink execution environment.
         */
        public ExecutionEnvironment getContext() {
@@ -102,19 +295,35 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        /**
-        * @return the vertex dataset.
+        * @return the vertex DataSet.
         */
        public DataSet<Vertex<K, VV>> getVertices() {
                return vertices;
        }
 
        /**
-        * @return the edge dataset.
+        * @return the edge DataSet.
         */
        public DataSet<Edge<K, EV>> getEdges() {
                return edges;
        }
 
+       /**
+        * @return the vertex DataSet as Tuple2.
+        */
+       @SuppressWarnings({ "unchecked" })
+       public DataSet<Tuple2<K, VV>> getVerticesAsTuple2() {
+               return (DataSet<Tuple2<K, VV>>) (DataSet<?>) vertices;
+       }
+
+       /**
+        * @return the edge DataSet as Tuple3.
+        */
+       @SuppressWarnings({ "unchecked" })
+       public DataSet<Tuple3<K, K, EV>> getEdgesAsTuple3() {
+               return (DataSet<Tuple3<K, K, EV>>) (DataSet<?>) edges;
+       }
+
     /**
      * Apply a function to the attribute of each vertex in the graph.
      * @param mapper the map function to apply.
@@ -139,7 +348,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                                })
                        .returns(returnType);
 
-        return new Graph<K, NV, EV>(mappedVertices, this.getEdges(), 
this.context);
+        return new Graph<K, NV, EV>(mappedVertices, this.edges, this.context);
     }
 
     /**
@@ -180,7 +389,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                DataSet<Vertex<K, VV>> resultedVertices = this.getVertices()
                                .coGroup(inputDataSet).where(0).equalTo(0)
                                .with(new ApplyCoGroupToVertexValues<K, VV, 
T>(mapper));
-               return Graph.create(resultedVertices, this.getEdges(), 
this.getContext());
+               return new Graph<K, VV, EV>(resultedVertices, this.edges, 
this.context);
        }
 
        private static final class ApplyCoGroupToVertexValues<K extends 
Comparable<K> & Serializable,
@@ -226,7 +435,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
                                .coGroup(inputDataSet).where(0,1).equalTo(0,1)
                                .with(new ApplyCoGroupToEdgeValues<K, EV, 
T>(mapper));
-               return Graph.create(this.getVertices(), resultedEdges, 
this.getContext());
+               return new Graph<K, VV, EV>(this.vertices, resultedEdges, 
this.context);
        }
 
        private static final class ApplyCoGroupToEdgeValues<K extends 
Comparable<K> & Serializable,
@@ -275,7 +484,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                                .coGroup(inputDataSet).where(0).equalTo(0)
                                .with(new 
ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
 
-               return Graph.create(this.getVertices(), resultedEdges, 
this.getContext());
+               return new Graph<K, VV, EV>(this.vertices, resultedEdges, 
this.context);
        }
 
        private static final class 
ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K extends Comparable<K> & 
Serializable,
@@ -327,7 +536,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                                .coGroup(inputDataSet).where(1).equalTo(0)
                                .with(new 
ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
 
-               return Graph.create(this.getVertices(), resultedEdges, 
this.getContext());
+               return new Graph<K, VV, EV>(this.vertices, resultedEdges, 
this.context);
        }
 
        /**
@@ -650,120 +859,6 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        /**
-        * Creates a graph from a dataset of vertices and a dataset of edges
-        * @param vertices a DataSet of vertices.
-        * @param edges a DataSet of vertices.
-        * @param context the flink execution environment.
-        * @return the newly created graph
-        */
-       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable,
-               EV extends Serializable> Graph<K, VV, EV>
-               create(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> 
edges, 
-                               ExecutionEnvironment context) {
-               return new Graph<K, VV, EV>(vertices, edges, context);
-       }
-       
-       /**
-        * Creates a graph from a DataSet of edges.
-        * Vertices are created automatically and their values are set to 
NullValue.
-        * @param edges a DataSet of vertices.
-        * @param context the flink execution environment.
-        * @return the newly created graph
-        */
-       public static <K extends Comparable<K> & Serializable, EV extends 
Serializable> 
-               Graph<K, NullValue, EV> create(DataSet<Edge<K, EV>> edges, 
ExecutionEnvironment context) {
-               DataSet<Vertex<K, NullValue>> vertices = 
-                               edges.flatMap(new EmitSrcAndTarget<K, 
EV>()).distinct(); 
-               return new Graph<K, NullValue, EV>(vertices, edges, context);
-       }
-       
-       /**
-        * Creates a graph from a DataSet of edges.
-        * Vertices are created automatically and their values are set
-        * by applying the provided map function to the vertex ids.
-        * @param edges the input edges
-        * @param mapper the map function to set the initial vertex value
-        * @return the newly created graph
-        */
-       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable> 
-               Graph<K, VV, EV> create(DataSet<Edge<K, EV>> edges, final 
MapFunction<K, VV> mapper, 
-                               ExecutionEnvironment context) {
-               TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
edges.getType()).getTypeAt(0);
-
-               TypeInformation<VV> valueType = TypeExtractor
-                               .createTypeInfo(MapFunction.class, 
mapper.getClass(), 1, null, null);
-
-               @SuppressWarnings({ "unchecked", "rawtypes" })
-               TypeInformation<Vertex<K, VV>> returnType = 
(TypeInformation<Vertex<K, VV>>)
-                               new TupleTypeInfo(Vertex.class, keyType, 
valueType);
-
-               DataSet<Vertex<K, VV>> vertices = 
-                               edges.flatMap(new EmitSrcAndTargetAsTuple1<K, 
EV>())
-                               .distinct().map(new MapFunction<Tuple1<K>, 
Vertex<K, VV>>(){
-                                       public Vertex<K, VV> map(Tuple1<K> 
value) throws Exception {
-                                               return new Vertex<K, 
VV>(value.f0, mapper.map(value.f0));
-                                       }
-                               }).returns(returnType);
-               return new Graph<K, VV, EV>(vertices, edges, context);
-       }
-
-       private static final class EmitSrcAndTarget<K extends Comparable<K> & 
Serializable, EV extends Serializable>
-               implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> {
-               public void flatMap(Edge<K, EV> edge,
-                               Collector<Vertex<K, NullValue>> out) {
-
-                               out.collect(new Vertex<K, NullValue>(edge.f0, 
NullValue.getInstance()));
-                               out.collect(new Vertex<K, NullValue>(edge.f1, 
NullValue.getInstance()));
-               }       
-       }
-
-       private static final class EmitSrcAndTargetAsTuple1<K extends 
Comparable<K> & Serializable, 
-               EV extends Serializable> implements FlatMapFunction<Edge<K, 
EV>, Tuple1<K>> {
-               public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) 
{
-
-                       out.collect(new Tuple1<K>(edge.f0));
-                       out.collect(new Tuple1<K>(edge.f1));
-               }       
-       }
-
-       /**
-        * Read and create the graph vertex Tuple2 DataSet from a csv file
-        *
-        * The CSV file should be of the following format:
-        *
-        * <vertexID><delimiter><vertexValue>
-        *
-        * For example, with space delimiter:
-        *
-        * 1 57
-        * 2 45
-        * 3 77
-        * 4 12
-        *
-        * @param context the flink execution environment.
-        * @param filePath the path to the CSV file.
-        * @param delimiter the CSV delimiter.
-        * @param Tuple2IdClass The class to use for Vertex IDs
-        * @param Tuple2ValueClass The class to use for Vertex Values
-        * @return a set of vertices and their values.
-        */
-       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable>
-               DataSet<Tuple2<K, VV>>
-               readTuple2CsvFile(ExecutionEnvironment context, String filePath,
-                       char delimiter, Class<K> Tuple2IdClass, Class<VV> 
Tuple2ValueClass) {
-
-               CsvReader reader = new CsvReader(filePath, context);
-               DataSet<Tuple2<K, VV>> vertices = 
reader.fieldDelimiter(delimiter).types(Tuple2IdClass, Tuple2ValueClass)
-               .map(new MapFunction<Tuple2<K, VV>, Tuple2<K, VV>>() {
-
-                       public Tuple2<K, VV> map(Tuple2<K, VV> value) throws 
Exception {
-                               return (Tuple2<K, VV>)value;
-                       }
-               });
-               return vertices;
-       }
-
-       /**
         * @return Singleton DataSet containing the vertex count
         */
        public DataSet<Integer> numberOfVertices () {
@@ -880,14 +975,6 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
         }
        }
        
-    public Graph<K, VV, EV> fromCollection (Collection<Vertex<K,VV>> vertices, 
Collection<Edge<K,EV>> edges) {
-
-               DataSet<Vertex<K, VV>> v = context.fromCollection(vertices);
-               DataSet<Edge<K, EV>> e = context.fromCollection(edges);
-
-               return new Graph<K, VV, EV>(v, e, context);
-       }
-
        /**
         * Adds the input vertex and edges to the graph.
         * If the vertex already exists in the graph, it will not be added 
again,
@@ -902,14 +989,14 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
 
        // Take care of empty edge set
        if (edges.isEmpty()) {
-               return Graph.create(getVertices().union(newVertex).distinct(), 
getEdges(), context);
+               return new Graph<K, VV, 
EV>(this.vertices.union(newVertex).distinct(), this.edges, this.context);
        }
 
        // Add the vertex and its edges
-       DataSet<Vertex<K, VV>> newVertices = 
getVertices().union(newVertex).distinct();
-       DataSet<Edge<K, EV>> newEdges = 
getEdges().union(context.fromCollection(edges));
+       DataSet<Vertex<K, VV>> newVertices = 
this.vertices.union(newVertex).distinct();
+       DataSet<Edge<K, EV>> newEdges = 
this.edges.union(context.fromCollection(edges));
 
-       return Graph.create(newVertices, newEdges, context);
+       return new Graph<K, VV, EV>(newVertices, newEdges, this.context);
     }
 
     /**
@@ -922,8 +1009,11 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
      * @return the new graph containing the existing vertices and edges plus 
the newly added edge
      */
     public Graph<K, VV, EV> addEdge (Vertex<K,VV> source, Vertex<K,VV> target, 
EV edgeValue) {
-       Graph<K,VV,EV> partialGraph = this.fromCollection(Arrays.asList(source, 
target),
-                               Arrays.asList(new Edge<K, EV>(source.f0, 
target.f0, edgeValue)));
+       Graph<K,VV,EV> partialGraph = fromCollection(
+                               Arrays.asList(source, target),
+                               Arrays.asList(new Edge<K, EV>(source.f0, 
target.f0, edgeValue)),
+                               this.context
+               );
         return this.union(partialGraph);
     }
 
@@ -986,7 +1076,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
     public Graph<K, VV, EV> removeEdge (Edge<K, EV> edge) {
                DataSet<Edge<K, EV>> newEdges = getEdges().filter(
                                new EdgeRemovalEdgeFilter<K, EV>(edge));
-        return new Graph<K, VV, EV>(this.getVertices(), newEdges, 
this.context);
+        return new Graph<K, VV, EV>(this.vertices, newEdges, this.context);
     }
     
     private static final class EdgeRemovalEdgeFilter<K extends Comparable<K> & 
Serializable, 
@@ -1028,58 +1118,9 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(
                             VertexCentricIteration.withEdges(edges,
                                                vertexUpdateFunction, 
messagingFunction, maximumNumberOfIterations));
-               return new Graph<K, VV, EV>(newVertices, edges, context);
+               return new Graph<K, VV, EV>(newVertices, this.edges, 
this.context);
     }
 
-       /**
-        * Creates a graph from the given vertex and edge collections
-        * @param context the flink execution environment.
-        * @param v the collection of vertices
-        * @param e the collection of edges
-        * @return a new graph formed from the set of edges and vertices
-        */
-       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable,
-                       EV extends Serializable> Graph<K, VV, EV>
-               fromCollection(ExecutionEnvironment context, 
Collection<Vertex<K, VV>> v,
-                                          Collection<Edge<K, EV>> e) throws 
Exception {
-
-               DataSet<Vertex<K, VV>> vertices = context.fromCollection(v);
-               DataSet<Edge<K, EV>> edges = context.fromCollection(e);
-
-               return Graph.create(vertices, edges, context);
-       }
-
-       /**
-        * Vertices may not have a value attached or may receive a value as a 
result of running the algorithm.
-        * @param context the flink execution environment.
-        * @param e the collection of edges
-        * @return a new graph formed from the edges, with no value for the 
vertices
-        */
-       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable,
-                       EV extends Serializable> Graph<K, NullValue, EV>
-               fromCollection(ExecutionEnvironment context, Collection<Edge<K, 
EV>> e) {
-
-               DataSet<Edge<K, EV>> edges = context.fromCollection(e);
-
-               return Graph.create(edges, context);
-       }
-
-       /**
-        * Vertices may have an initial value defined by a function.
-        * @param context the flink execution environment.
-        * @param e the collection of edges
-        * @return a new graph formed from the edges, with a custom value for 
the vertices,
-        * determined by the mapping function
-        */
-       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable,
-                       EV extends Serializable> Graph<K, VV, EV>
-               fromCollection(ExecutionEnvironment context, Collection<Edge<K, 
EV>> e,
-                                          final MapFunction<K, VV> mapper) {
-
-               DataSet<Edge<K, EV>> edges = context.fromCollection(e);
-               return Graph.create(edges, mapper, context);
-       }
-
        public Graph<K, VV, EV> run (GraphAlgorithm<K, VV, EV> algorithm) {
                return algorithm.run(this);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/09d39ce6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index 44f226f..668c765 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -23,7 +23,6 @@ import flink.graphs.Graph;
 import flink.graphs.Vertex;
 import flink.graphs.example.utils.MusicProfilesData;
 import flink.graphs.library.LabelPropagation;
-import flink.graphs.utils.Tuple3ToEdgeMap;
 
 @SuppressWarnings("serial")
 public class MusicProfiles implements ProgramDescription {
@@ -67,9 +66,7 @@ public class MusicProfiles implements ProgramDescription {
         *  Create a user -> song weighted bipartite graph
         *  where the edge weights correspond to play counts
         */
-       DataSet<Edge<String, Integer>> userSongEdges = validTriplets.map(new 
Tuple3ToEdgeMap<String, Integer>());
-
-       Graph<String, NullValue, Integer> userSongGraph = 
Graph.fromDataSet(userSongEdges, env);
+       Graph<String, NullValue, Integer> userSongGraph = 
Graph.fromTupleDataSet(validTriplets, env);
 
        /**
         *  Get the top track (most listened) for each user

Reply via email to