[FLINK-1201] [gelly] Updated JavaDoc for Graph class

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b511932d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b511932d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b511932d

Branch: refs/heads/master
Commit: b511932d795f5dd8e20a1cf13143aa149c89c368
Parents: 1d6a200
Author: Carsten Brandt <m...@cebe.cc>
Authored: Thu Jan 15 19:53:23 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Feb 11 10:46:14 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 305 +++++++++++--------
 1 file changed, 180 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b511932d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index fb72a20..646c747 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -56,6 +56,17 @@ import flink.graphs.utils.GraphUtils;
 import flink.graphs.utils.Tuple2ToVertexMap;
 import flink.graphs.validation.GraphValidator;
 
+/**
+ * Represents a Graph consisting of {@link Edge edges} and {@link Vertex 
vertices}.
+ *
+ *
+ * @see flink.graphs.Edge
+ * @see flink.graphs.Vertex
+ *
+ * @param <K> the key type for edge and vertex identifiers
+ * @param <VV> the value type for vertexes
+ * @param <EV> the value type for edges
+ */
 @SuppressWarnings("serial")
 public class Graph<K extends Comparable<K> & Serializable, VV extends 
Serializable,    EV extends Serializable> {
 
@@ -64,12 +75,27 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
        private final DataSet<Edge<K, EV>> edges;
        private boolean isUndirected;
 
+       /**
+        * Creates a graph from two datasets: vertices and edges
+        *
+        * @param vertices a DataSet of vertices.
+        * @param edges a DataSet of vertices.
+        * @param context the flink execution environment.
+        */
        public Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> 
edges, ExecutionEnvironment context) {
 
                /** a graph is directed by default */
                this(vertices, edges, context, false);
        }
 
+       /**
+        * Creates a graph from two datasets: vertices and edges and allow 
setting the undirected property
+        *
+        * @param vertices a DataSet of vertices.
+        * @param edges a DataSet of vertices.
+        * @param context the flink execution environment.
+        * @param undirected whether this is an undirected graph
+        */
        public Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> 
edges, ExecutionEnvironment context,
                        boolean undirected) {
                this.vertices = vertices;
@@ -78,29 +104,38 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                this.isUndirected = undirected;
        }
 
+       /**
+        * @return the flink execution environment.
+        */
        public ExecutionEnvironment getContext() {
                return this.context;
        }
 
        /**
         * Function that checks whether a graph's ids are valid
-        * @return
-        */
+        * @return true if the graph's ids are valid, false if not.
+        */
        public DataSet<Boolean> validate(GraphValidator<K, VV, EV> validator) {
                return validator.validate(this);
        }
 
+       /**
+        * @return the vertex dataset.
+        */
        public DataSet<Vertex<K, VV>> getVertices() {
                return vertices;
        }
 
+       /**
+        * @return the edge dataset.
+        */
        public DataSet<Edge<K, EV>> getEdges() {
                return edges;
        }
 
     /**
      * Apply a function to the attribute of each vertex in the graph.
-     * @param mapper
+     * @param mapper the map function to apply.
      * @return a new graph
      */
     @SuppressWarnings({ "unchecked", "rawtypes" })
@@ -124,11 +159,11 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
 
         return new Graph<K, NV, EV>(mappedVertices, this.getEdges(), 
this.context);
     }
-    
+
     /**
      * Apply a function to the attribute of each edge in the graph.
-     * @param mapper
-     * @return 
+     * @param mapper the map function to apply.
+     * @return a new graph
      */
     @SuppressWarnings({ "unchecked", "rawtypes" })
        public <NV extends Serializable> Graph<K, VV, NV> mapEdges(final 
MapFunction<Edge<K, EV>, NV> mapper) {
@@ -152,10 +187,10 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
     }
 
        /**
-        * Method that joins the vertex DataSet with an input DataSet and 
applies a UDF on the resulted values.
-        * @param inputDataSet
-        * @param mapper - the UDF applied
-        * @return - a new graph where the vertex values have been updated.
+        * Joins the vertex DataSet of this graph with an input DataSet and 
applies a UDF on the resulted values.
+        * @param inputDataSet the DataSet to join with.
+        * @param mapper the UDF map function to apply.
+        * @return a new graph where the vertex values have been updated.
         */
        public <T> Graph<K, VV, EV> joinWithVertices(DataSet<Tuple2<K, T>> 
inputDataSet,
                        final MapFunction<Tuple2<VV, T>, VV> mapper) {
@@ -196,12 +231,12 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        /**
-        * Method that joins the edge DataSet with an input DataSet on a 
composite key of both source and target
+        * Joins the edge DataSet with an input DataSet on a composite key of 
both source and target
         * and applies a UDF on the resulted values.
-        * @param inputDataSet
-        * @param mapper - the UDF applied
+        * @param inputDataSet the DataSet to join with.
+        * @param mapper the UDF map function to apply.
         * @param <T>
-        * @return - a new graph where the edge values have been updated.
+        * @return a new graph where the edge values have been updated.
         */
        public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> 
inputDataSet, 
                        final MapFunction<Tuple2<EV, T>, EV> mapper) {
@@ -243,13 +278,13 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        /**
-        * Method that joins the edge DataSet with an input DataSet on the 
source key of the edges and the first attribute
+        * Joins the edge DataSet with an input DataSet on the source key of 
the edges and the first attribute
         * of the input DataSet and applies a UDF on the resulted values.
-        * Should the inputDataSet contain the same key more than once, only 
the first value will be considered.
-        * @param inputDataSet
-        * @param mapper - the UDF applied
+        * In case the inputDataSet contains the same key more than once, only 
the first value will be considered.
+        * @param inputDataSet the DataSet to join with.
+        * @param mapper the UDF map function to apply.
         * @param <T>
-        * @return - a new graph where the edge values have been updated.
+        * @return a new graph where the edge values have been updated.
         */
        public <T> Graph<K, VV, EV> joinWithEdgesOnSource(DataSet<Tuple2<K, T>> 
inputDataSet,
                        final MapFunction<Tuple2<EV, T>, EV> mapper) {
@@ -295,13 +330,13 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        /**
-        * Method that joins the edge DataSet with an input DataSet on the 
target key of the edges and the first attribute
+        * Joins the edge DataSet with an input DataSet on the target key of 
the edges and the first attribute
         * of the input DataSet and applies a UDF on the resulted values.
         * Should the inputDataSet contain the same key more than once, only 
the first value will be considered.
-        * @param inputDataSet
-        * @param mapper - the UDF applied
+        * @param inputDataSet the DataSet to join with.
+        * @param mapper the UDF map function to apply.
         * @param <T>
-        * @return - a new graph where the edge values have been updated.
+        * @return a new graph where the edge values have been updated.
         */
        public <T> Graph<K, VV, EV> joinWithEdgesOnTarget(DataSet<Tuple2<K, T>> 
inputDataSet,
                        final MapFunction<Tuple2<EV, T>, EV> mapper) {
@@ -314,14 +349,14 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        /**
-     * Apply value-based filtering functions to the graph 
-     * and return a sub-graph that satisfies the predicates
-     * for both vertex values and edge values.
-     * @param vertexFilter
-     * @param edgeFilter
-     * @return
-     */
-    public Graph<K, VV, EV> subgraph(FilterFunction<Vertex<K, VV>> 
vertexFilter, FilterFunction<Edge<K, EV>> edgeFilter) {
+        * Apply filtering functions to the graph
+        * and return a sub-graph that satisfies the predicates
+        * for both vertices and edges.
+        * @param vertexFilter the filter function for vertices.
+        * @param edgeFilter the filter function for edges.
+        * @return the resulting sub-graph.
+        */
+       public Graph<K, VV, EV> subgraph(FilterFunction<Vertex<K, VV>> 
vertexFilter, FilterFunction<Edge<K, EV>> edgeFilter) {
 
         DataSet<Vertex<K, VV>> filteredVertices = 
this.vertices.filter(vertexFilter);
 
@@ -337,11 +372,11 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
     }
 
        /**
-        * Apply value-based filtering functions to the graph
+        * Apply a filtering function to the graph
         * and return a sub-graph that satisfies the predicates
         * only for the vertices.
-        * @param vertexFilter
-        * @return
+        * @param vertexFilter the filter function for vertices.
+        * @return the resulting sub-graph.
         */
        public Graph<K, VV, EV> filterOnVertices(FilterFunction<Vertex<K, VV>> 
vertexFilter) {
 
@@ -357,18 +392,18 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        /**
-        * Apply value-based filtering functions to the graph
+        * Apply a filtering function to the graph
         * and return a sub-graph that satisfies the predicates
         * only for the edges.
-        * @param edgeFilter
-        * @return
+        * @param edgeFilter the filter function for edges.
+        * @return the resulting sub-graph.
         */
        public Graph<K, VV, EV> filterOnEdges(FilterFunction<Edge<K, EV>> 
edgeFilter) {
                DataSet<Edge<K, EV>> filteredEdges = 
this.edges.filter(edgeFilter);
 
                return new Graph<K, VV, EV>(this.vertices, filteredEdges, 
this.context);
        }
-    
+
     @ConstantFieldsFirst("0->0;1->1;2->2")
     private static final class ProjectEdge<K extends Comparable<K> & 
Serializable, 
        VV extends Serializable, EV extends Serializable> implements 
FlatJoinFunction<Edge<K,EV>, Vertex<K,VV>, 
@@ -424,7 +459,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
        /**
         * Convert the directed graph into an undirected graph
         * by adding all inverse-direction edges.
-        *
+        * @return the undirected graph.
         */
        public Graph<K, VV, EV> getUndirected() throws 
UnsupportedOperationException {
                if (this.isUndirected) {
@@ -644,9 +679,10 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
 
        /**
         * Creates a graph from a dataset of vertices and a dataset of edges
-        * @param vertices
-        * @param edges
-        * @return
+        * @param vertices a DataSet of vertices.
+        * @param edges a DataSet of vertices.
+        * @param context the flink execution environment.
+        * @return the newly created graph
         */
        public static <K extends Comparable<K> & Serializable, VV extends 
Serializable,
                EV extends Serializable> Graph<K, VV, EV>
@@ -658,8 +694,9 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
        /**
         * Creates a graph from a DataSet of edges.
         * Vertices are created automatically and their values are set to 
NullValue.
-        * @param edges
-        * @return
+        * @param edges a DataSet of vertices.
+        * @param context the flink execution environment.
+        * @return the newly created graph
         */
        public static <K extends Comparable<K> & Serializable, EV extends 
Serializable> 
                Graph<K, NullValue, EV> create(DataSet<Edge<K, EV>> edges, 
ExecutionEnvironment context) {
@@ -674,7 +711,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
         * by applying the provided map function to the vertex ids.
         * @param edges the input edges
         * @param mapper the map function to set the initial vertex value
-        * @return
+        * @return the newly created graph
         */
        public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable> 
                Graph<K, VV, EV> create(DataSet<Edge<K, EV>> edges, final 
MapFunction<K, VV> mapper, 
@@ -718,19 +755,32 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        /**
-        * Read and create the graph Tuple2 dataset from a csv file
-        * @param env
-        * @param filePath
-        * @param delimiter
-        * @param Tuple2IdClass
-        * @param Tuple2ValueClass
-        * @return
+        * Read and create the graph vertex Tuple2 DataSet from a csv file
+        *
+        * The CSV file should be of the following format:
+        *
+        * <vertexID><delimiter><vertexValue>
+        *
+        * For example, with space delimiter:
+        *
+        * 1 57
+        * 2 45
+        * 3 77
+        * 4 12
+        *
+        * @param context the flink execution environment.
+        * @param filePath the path to the CSV file.
+        * @param delimiter the CSV delimiter.
+        * @param Tuple2IdClass The class to use for Vertex IDs
+        * @param Tuple2ValueClass The class to use for Vertex Values
+        * @return a set of vertices and their values.
         */
        public static <K extends Comparable<K> & Serializable, VV extends 
Serializable>
-               DataSet<Tuple2<K, VV>> readTuple2CsvFile(ExecutionEnvironment 
env, String filePath,
+               DataSet<Tuple2<K, VV>>
+               readTuple2CsvFile(ExecutionEnvironment context, String filePath,
                        char delimiter, Class<K> Tuple2IdClass, Class<VV> 
Tuple2ValueClass) {
 
-               CsvReader reader = new CsvReader(filePath, env);
+               CsvReader reader = new CsvReader(filePath, context);
                DataSet<Tuple2<K, VV>> vertices = 
reader.fieldDelimiter(delimiter).types(Tuple2IdClass, Tuple2ValueClass)
                .map(new MapFunction<Tuple2<K, VV>, Tuple2<K, VV>>() {
 
@@ -741,23 +791,21 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                return vertices;
        }
 
-    /**
-     * @return Singleton DataSet containing the vertex count
-     */
+       /**
+        * @return Singleton DataSet containing the vertex count
+        */
        public DataSet<Integer> numberOfVertices () {
         return GraphUtils.count(vertices, context);
     }
 
-    /**
-     *
-     * @return Singleton DataSet containing the edge count
-     */
+       /**
+        * @return Singleton DataSet containing the edge count
+        */
        public DataSet<Integer> numberOfEdges () {
         return GraphUtils.count(edges, context);
     }
 
     /**
-     *
      * @return The IDs of the vertices as DataSet
      */
     public DataSet<K> getVertexIds () {
@@ -772,6 +820,9 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
             }
     }
 
+    /**
+     * @return The IDs of the edges as DataSet
+     */
     public DataSet<Tuple2<K, K>> getEdgeIds () {
         return edges.map(new ExtractEdgeIDsMapper<K, EV>());
     }
@@ -784,11 +835,11 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
             }
     }
 
-    /**
-     * Checks the weak connectivity of a graph.
-     * @param maxIterations the maximum number of iterations for the inner 
delta iteration
-     * @return true if the graph is weakly connected.
-     */
+       /**
+        * Checks the weak connectivity of a graph.
+        * @param maxIterations the maximum number of iterations for the inner 
delta iteration
+        * @return true if the graph is weakly connected.
+        */
        public DataSet<Boolean> isWeaklyConnected (int maxIterations) {
                Graph<K, VV, EV> graph;
                
@@ -870,17 +921,17 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                DataSet<Edge<K, EV>> e = context.fromCollection(edges);
 
                return new Graph<K, VV, EV>(v, e, context);
-    }
+       }
 
-    /**
-     * Adds the input vertex and edges to the graph.
-     * If the vertex already exists in the graph, it will not be added again,
-     * but the given edges will. 
-     * @param vertex
-     * @param edges
-     * @return
-     */
-    @SuppressWarnings("unchecked")
+       /**
+        * Adds the input vertex and edges to the graph.
+        * If the vertex already exists in the graph, it will not be added 
again,
+        * but the given edges will.
+        * @param vertex the vertex to add to the graph
+        * @param edges a list of edges to add to the grap
+        * @return the new graph containing the existing and newly added 
vertices and edges
+        */
+       @SuppressWarnings("unchecked")
        public Graph<K, VV, EV> addVertex (final Vertex<K,VV> vertex, 
List<Edge<K, EV>> edges) {
        DataSet<Vertex<K, VV>> newVertex = this.context.fromElements(vertex);
 
@@ -896,14 +947,14 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        return Graph.create(newVertices, newEdges, context);
     }
 
-    /** 
+    /**
      * Adds the given edge to the graph.
      * If the source and target vertices do not exist in the graph,
      * they will also be added.
-     * @param source
-     * @param target
-     * @param edgeValue
-     * @return
+     * @param source the source vertex of the edge
+     * @param target the target vertex of the edge
+     * @param edgeValue the edge value
+     * @return the new graph containing the existing vertices and edges plus 
the newly added edge
      */
     public Graph<K, VV, EV> addEdge (Vertex<K,VV> source, Vertex<K,VV> target, 
EV edgeValue) {
        Graph<K,VV,EV> partialGraph = this.fromCollection(Arrays.asList(source, 
target),
@@ -911,12 +962,13 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
         return this.union(partialGraph);
     }
 
-    /**
-     * Removes the given vertex and its edges from the graph.
-     * @param vertex
-     * @return
-     */
-    public Graph<K, VV, EV> removeVertex (Vertex<K,VV> vertex) {
+       /**
+        * Removes the given vertex and its edges from the graph.
+        * @param vertex the vertex to remove
+        * @return the new graph containing the existing vertices and edges 
without the removed vertex and its edges
+        */
+       public Graph<K, VV, EV> removeVertex (Vertex<K,VV> vertex) {
+
                DataSet<Vertex<K, VV>> newVertices = getVertices().filter(
                                new RemoveVertexFilter<K, VV>(vertex));
                DataSet<Edge<K, EV>> newEdges = getEdges().filter(
@@ -960,11 +1012,11 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
             return true;
         }
     }
-    
+
     /**
      * Removes all edges that match the given edge from the graph.
-     * @param edge
-     * @return
+     * @param edge the edge to remove
+     * @return the new graph containing the existing vertices and edges 
without the removed edges
      */
     public Graph<K, VV, EV> removeEdge (Edge<K, EV> edge) {
                DataSet<Edge<K, EV>> newEdges = getEdges().filter(
@@ -990,8 +1042,8 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
     /**
      * Performs union on the vertices and edges sets of the input graphs
      * removing duplicate vertices but maintaining duplicate edges.
-     * @param graph
-     * @return
+     * @param graph the graph to perform union with
+     * @return a new graph
      */
     public Graph<K, VV, EV> union (Graph<K, VV, EV> graph) {
         DataSet<Vertex<K,VV>> unionedVertices = 
graph.getVertices().union(this.getVertices()).distinct();
@@ -999,14 +1051,14 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
         return new Graph<K,VV,EV>(unionedVertices, unionedEdges, this.context);
     }
 
-    /**
-     * Runs a Vertex-Centric iteration on the graph.
-     * @param vertexUpdateFunction
-     * @param messagingFunction
-     * @param maximumNumberOfIterations
-     * @return
-     */
-    @SuppressWarnings("unchecked")
+       /**
+        * Runs a Vertex-Centric iteration on the graph.
+        * @param vertexUpdateFunction the vertex update function
+        * @param messagingFunction the messaging function
+        * @param maximumNumberOfIterations maximum number of iterations to 
perform
+        * @return
+        */
+       @SuppressWarnings("unchecked")
        public <M>Graph<K, VV, EV> 
runVertexCentricIteration(VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
                MessagingFunction<K, VV, M, EV> messagingFunction, int 
maximumNumberOfIterations) {
        DataSet<Tuple2<K, VV>> tupleVertices = (DataSet<Tuple2<K, VV>>) 
(DataSet<?>) vertices;
@@ -1018,49 +1070,52 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
     }
 
        /**
-        * Creates a graph from the given vertex and edge collections
-        * @param env
-        * @param v the collection of vertices
-         * @param e the collection of edges
-         * @return a new graph formed from the set of edges and vertices
-         */
-        public static <K extends Comparable<K> & Serializable, VV extends 
Serializable,
-                       EV extends Serializable> Graph<K, VV, EV> 
fromCollection(ExecutionEnvironment env, 
-                                       Collection<Vertex<K, VV>> v, 
Collection<Edge<K, EV>> e) throws Exception {
-               DataSet<Vertex<K, VV>> vertices = env.fromCollection(v);
-               DataSet<Edge<K, EV>> edges = env.fromCollection(e);
-
-               return Graph.create(vertices, edges, env);
+        * Creates a graph from the given vertex and edge collections
+        * @param context the flink execution environment.
+        * @param v the collection of vertices
+        * @param e the collection of edges
+        * @return a new graph formed from the set of edges and vertices
+        */
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable,
+                       EV extends Serializable> Graph<K, VV, EV>
+               fromCollection(ExecutionEnvironment context, 
Collection<Vertex<K, VV>> v,
+                                          Collection<Edge<K, EV>> e) throws 
Exception {
+
+               DataSet<Vertex<K, VV>> vertices = context.fromCollection(v);
+               DataSet<Edge<K, EV>> edges = context.fromCollection(e);
+
+               return Graph.create(vertices, edges, context);
        }
 
        /**
         * Vertices may not have a value attached or may receive a value as a 
result of running the algorithm.
-        * @param env
+        * @param context the flink execution environment.
         * @param e the collection of edges
         * @return a new graph formed from the edges, with no value for the 
vertices
         */
        public static <K extends Comparable<K> & Serializable, VV extends 
Serializable,
-                       EV extends Serializable> Graph<K, NullValue, EV> 
fromCollection(ExecutionEnvironment env, 
-                                       Collection<Edge<K, EV>> e) {
+                       EV extends Serializable> Graph<K, NullValue, EV>
+               fromCollection(ExecutionEnvironment context, Collection<Edge<K, 
EV>> e) {
 
-               DataSet<Edge<K, EV>> edges = env.fromCollection(e);
+               DataSet<Edge<K, EV>> edges = context.fromCollection(e);
 
-               return Graph.create(edges, env);
+               return Graph.create(edges, context);
        }
 
        /**
         * Vertices may have an initial value defined by a function.
-        * @param env
+        * @param context the flink execution environment.
         * @param e the collection of edges
         * @return a new graph formed from the edges, with a custom value for 
the vertices,
         * determined by the mapping function
         */
        public static <K extends Comparable<K> & Serializable, VV extends 
Serializable,
-                       EV extends Serializable> Graph<K, VV, EV> 
fromCollection(ExecutionEnvironment env,
-                                                                               
                                                         Collection<Edge<K, 
EV>> e,
-                                                                               
                                                         final MapFunction<K, 
VV> mapper) {
-               DataSet<Edge<K, EV>> edges = env.fromCollection(e);
-               return Graph.create(edges, mapper, env);
+                       EV extends Serializable> Graph<K, VV, EV>
+               fromCollection(ExecutionEnvironment context, Collection<Edge<K, 
EV>> e,
+                                          final MapFunction<K, VV> mapper) {
+
+               DataSet<Edge<K, EV>> edges = context.fromCollection(e);
+               return Graph.create(edges, mapper, context);
        }
 
        public Graph<K, VV, EV> run (GraphAlgorithm<K, VV, EV> algorithm) {

Reply via email to