http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index a236478..59920a6 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package flink.graphs;
+package org.apache.flink.graph;
 
 import java.io.Serializable;
 import java.util.Arrays;
@@ -45,122 +45,128 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexCentricIteration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.EdgeToTuple3Map;
+import org.apache.flink.graph.utils.GraphUtils;
+import org.apache.flink.graph.utils.Tuple2ToVertexMap;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
+import org.apache.flink.graph.validation.GraphValidator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.types.NullValue;
 
-import flink.graphs.spargel.MessagingFunction;
-import flink.graphs.spargel.VertexCentricIteration;
-import flink.graphs.spargel.VertexUpdateFunction;
-import flink.graphs.utils.GraphUtils;
-import flink.graphs.utils.Tuple2ToVertexMap;
-import flink.graphs.utils.Tuple3ToEdgeMap;
-import flink.graphs.validation.GraphValidator;
-
 /**
- * Represents a Graph consisting of {@link Edge edges} and {@link Vertex 
vertices}.
- *
- *
- * @see flink.graphs.Edge
- * @see flink.graphs.Vertex
- *
+ * Represents a Graph consisting of {@link Edge edges} and {@link Vertex
+ * vertices}.
+ * 
+ * 
+ * @see org.apache.flink.graph.Edge
+ * @see org.apache.flink.graph.Vertex
+ * 
  * @param <K> the key type for edge and vertex identifiers
  * @param <VV> the value type for vertexes
  * @param <EV> the value type for edges
  */
 @SuppressWarnings("serial")
-public class Graph<K extends Comparable<K> & Serializable, VV extends 
Serializable,    EV extends Serializable> {
+public class Graph<K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable> {
 
-    private final ExecutionEnvironment context;
+       private final ExecutionEnvironment context;
        private final DataSet<Vertex<K, VV>> vertices;
        private final DataSet<Edge<K, EV>> edges;
 
        /**
-        * Creates a graph from two DataSets: vertices and edges and allow 
setting the undirected property
-        *
+        * Creates a graph from two DataSets: vertices and edges and allow 
setting
+        * the undirected property
+        * 
         * @param vertices a DataSet of vertices.
-        * @param edges a DataSet of vertices.
+        * @param edges a DataSet of edges.
         * @param context the flink execution environment.
         */
        private Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> 
edges, ExecutionEnvironment context) {
                this.vertices = vertices;
                this.edges = edges;
-        this.context = context;
+               this.context = context;
        }
 
        /**
         * Creates a graph from a Collection of vertices and a Collection of 
edges.
+        * 
         * @param vertices a Collection of vertices.
-        * @param edges a Collection of vertices.
+        * @param edges a Collection of edges.
         * @param context the flink execution environment.
         * @return the newly created graph.
         */
-       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
-               Graph<K, VV, EV> fromCollection (Collection<Vertex<K,VV>> 
vertices,
-                                                                               
 Collection<Edge<K,EV>> edges,
-                                                                               
 ExecutionEnvironment context) {
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable> Graph<K, VV, EV> fromCollection(
+                       Collection<Vertex<K, VV>> vertices, Collection<Edge<K, 
EV>> edges,
+                       ExecutionEnvironment context) {
 
-               return fromDataSet(context.fromCollection(vertices), 
context.fromCollection(edges), context);
+               return fromDataSet(context.fromCollection(vertices),
+                               context.fromCollection(edges), context);
        }
 
        /**
-        * Creates a graph from a Collection of edges, vertices are induced 
from the edges.
-        * Vertices are created automatically and their values are set to 
NullValue.
+        * Creates a graph from a Collection of edges, vertices are induced 
from the
+        * edges. Vertices are created automatically and their values are set to
+        * NullValue.
+        * 
         * @param edges a Collection of vertices.
         * @param context the flink execution environment.
         * @return the newly created graph.
         */
-       public static <K extends Comparable<K> & Serializable, EV extends 
Serializable>
-               Graph<K, NullValue, EV> fromCollection (Collection<Edge<K,EV>> 
edges, ExecutionEnvironment context) {
+       public static <K extends Comparable<K> & Serializable, EV extends 
Serializable> Graph<K, NullValue, EV> fromCollection(
+                       Collection<Edge<K, EV>> edges, ExecutionEnvironment 
context) {
 
                return fromDataSet(context.fromCollection(edges), context);
        }
 
        /**
-        * Creates a graph from a Collection of edges, vertices are induced 
from the edges and
-        * vertex values are calculated by a mapper function.
-        * Vertices are created automatically and their values are set
-        * by applying the provided map function to the vertex ids.
-        * @param edges a Collection of vertices.
+        * Creates a graph from a Collection of edges, vertices are induced 
from the
+        * edges and vertex values are calculated by a mapper function. 
Vertices are
+        * created automatically and their values are set by applying the 
provided
+        * map function to the vertex ids.
+        * 
+        * @param edges a Collection of edges.
         * @param mapper the mapper function.
         * @param context the flink execution environment.
         * @return the newly created graph.
         */
-       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
-               Graph<K, VV, EV> fromCollection (Collection<Edge<K,EV>> edges,
-                                                                               
 final MapFunction<K, VV> mapper,
-                                                                               
 ExecutionEnvironment context) {
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable> Graph<K, VV, EV> fromCollection(
+                       Collection<Edge<K, EV>> edges, final MapFunction<K, VV> 
mapper,
+                       ExecutionEnvironment context) {
 
                return fromDataSet(context.fromCollection(edges), mapper, 
context);
        }
 
        /**
         * Creates a graph from a DataSet of vertices and a DataSet of edges.
+        * 
         * @param vertices a DataSet of vertices.
-        * @param edges a DataSet of vertices.
+        * @param edges a DataSet of edges.
         * @param context the flink execution environment.
         * @return the newly created graph.
         */
-       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
-               Graph<K, VV, EV> fromDataSet (DataSet<Vertex<K,VV>> vertices,
-                                                                         
DataSet<Edge<K,EV>> edges,
-                                                                         
ExecutionEnvironment context) {
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable> Graph<K, VV, EV> fromDataSet(
+                       DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> 
edges,
+                       ExecutionEnvironment context) {
 
                return new Graph<K, VV, EV>(vertices, edges, context);
        }
 
        /**
-        * Creates a graph from a DataSet of edges, vertices are induced from 
the edges.
-        * Vertices are created automatically and their values are set to 
NullValue.
-        * @param edges a DataSet of vertices.
+        * Creates a graph from a DataSet of edges, vertices are induced from 
the
+        * edges. Vertices are created automatically and their values are set to
+        * NullValue.
+        * 
+        * @param edges a DataSet of edges.
         * @param context the flink execution environment.
         * @return the newly created graph.
         */
-       public static <K extends Comparable<K> & Serializable, EV extends 
Serializable>
-               Graph<K, NullValue, EV> fromDataSet (DataSet<Edge<K,EV>> edges,
-                                                                               
         ExecutionEnvironment context) {
+       public static <K extends Comparable<K> & Serializable, EV extends 
Serializable> Graph<K, NullValue, EV> fromDataSet(
+                       DataSet<Edge<K, EV>> edges, ExecutionEnvironment 
context) {
 
-               DataSet<Vertex<K, NullValue>> vertices =
-                               edges.flatMap(new EmitSrcAndTarget<K, 
EV>()).distinct();
+               DataSet<Vertex<K, NullValue>> vertices = edges.flatMap(new 
EmitSrcAndTarget<K, EV>()).distinct();
 
                return new Graph<K, NullValue, EV>(vertices, edges, context);
        }
@@ -175,44 +181,41 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        /**
-        * Creates a graph from a DataSet of edges, vertices are induced from 
the edges and
-        * vertex values are calculated by a mapper function.
-        * Vertices are created automatically and their values are set
-        * by applying the provided map function to the vertex ids.
-        * @param edges a DataSet of vertices.
+        * Creates a graph from a DataSet of edges, vertices are induced from 
the
+        * edges and vertex values are calculated by a mapper function. 
Vertices are
+        * created automatically and their values are set by applying the 
provided
+        * map function to the vertex ids.
+        * 
+        * @param edges a DataSet of edges.
         * @param mapper the mapper function.
         * @param context the flink execution environment.
         * @return the newly created graph.
         */
-       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
-               Graph<K, VV, EV> fromDataSet (DataSet<Edge<K,EV>> edges,
-                                                                         final 
MapFunction<K, VV> mapper,
-                                                                         
ExecutionEnvironment context) {
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable> Graph<K, VV, EV> fromDataSet(
+                       DataSet<Edge<K, EV>> edges, final MapFunction<K, VV> 
mapper, ExecutionEnvironment context) {
 
                TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
edges.getType()).getTypeAt(0);
 
-               TypeInformation<VV> valueType = TypeExtractor
-                               .createTypeInfo(MapFunction.class, 
mapper.getClass(), 1, null, null);
+               TypeInformation<VV> valueType = TypeExtractor.createTypeInfo(
+                               MapFunction.class, mapper.getClass(), 1, null, 
null);
 
                @SuppressWarnings({ "unchecked", "rawtypes" })
-               TypeInformation<Vertex<K, VV>> returnType = 
(TypeInformation<Vertex<K, VV>>)
-                               new TupleTypeInfo(Vertex.class, keyType, 
valueType);
-
-               DataSet<Vertex<K, VV>> vertices =
-                               edges.flatMap(new EmitSrcAndTargetAsTuple1<K, 
EV>())
-                                    .distinct()
-                                    .map(new MapFunction<Tuple1<K>, Vertex<K, 
VV>>() {
-                                                public Vertex<K, VV> 
map(Tuple1<K> value) throws Exception {
-                                                        return new Vertex<K, 
VV>(value.f0, mapper.map(value.f0));
-                                                }
-                                        })
-                                    .returns(returnType);
+               TypeInformation<Vertex<K, VV>> returnType = 
(TypeInformation<Vertex<K, VV>>) new TupleTypeInfo(
+                               Vertex.class, keyType, valueType);
+
+               DataSet<Vertex<K, VV>> vertices = edges
+                               .flatMap(new EmitSrcAndTargetAsTuple1<K, 
EV>()).distinct()
+                               .map(new MapFunction<Tuple1<K>, Vertex<K, 
VV>>() {
+                                       public Vertex<K, VV> map(Tuple1<K> 
value) throws Exception {
+                                               return new Vertex<K, 
VV>(value.f0, mapper.map(value.f0));
+                                       }
+                               }).returns(returnType);
 
                return new Graph<K, VV, EV>(vertices, edges, context);
        }
 
-       private static final class EmitSrcAndTargetAsTuple1<K extends 
Comparable<K> & Serializable,
-                       EV extends Serializable> implements 
FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
+       private static final class EmitSrcAndTargetAsTuple1<K extends 
Comparable<K> & Serializable, EV extends Serializable>
+                       implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
 
                public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) 
{
                        out.collect(new Tuple1<K>(edge.f0));
@@ -222,19 +225,17 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
 
        /**
         * Creates a graph from a DataSet of Tuple objects for vertices and 
edges.
-        *
-        * Vertices with value are created from Tuple2,
-        * Edges with value are created from Tuple3.
-        *
-        * @param vertices a DataSet of vertices.
-        * @param edges a DataSet of vertices.
+        * 
+        * Vertices with value are created from Tuple2, Edges with value are 
created
+        * from Tuple3.
+        * 
+        * @param vertices a DataSet of Tuple2.
+        * @param edges a DataSet of Tuple3.
         * @param context the flink execution environment.
         * @return the newly created graph.
         */
-       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
-               Graph<K, VV, EV> fromTupleDataSet (DataSet<Tuple2<K, VV>> 
vertices,
-                                                                               
   DataSet<Tuple3<K, K, EV>> edges,
-                                                                               
   ExecutionEnvironment context) {
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable> Graph<K, VV, EV> fromTupleDataSet(
+                       DataSet<Tuple2<K, VV>> vertices, DataSet<Tuple3<K, K, 
EV>> edges, ExecutionEnvironment context) {
 
                DataSet<Vertex<K, VV>> vertexDataSet = vertices.map(new 
Tuple2ToVertexMap<K, VV>());
                DataSet<Edge<K, EV>> edgeDataSet = edges.map(new 
Tuple3ToEdgeMap<K, EV>());
@@ -242,38 +243,37 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        /**
-        * Creates a graph from a DataSet of Tuple objects for edges, vertices 
are induced from the edges.
-        *
-        * Edges with value are created from Tuple3.
-        * Vertices are created automatically and their values are set to 
NullValue.
-        *
-        * @param edges a DataSet of vertices.
+        * Creates a graph from a DataSet of Tuple objects for edges, vertices 
are
+        * induced from the edges.
+        * 
+        * Edges with value are created from Tuple3. Vertices are created
+        * automatically and their values are set to NullValue.
+        * 
+        * @param edges a DataSet of Tuple3.
         * @param context the flink execution environment.
         * @return the newly created graph.
         */
-       public static <K extends Comparable<K> & Serializable, EV extends 
Serializable>
-               Graph<K, NullValue, EV> fromTupleDataSet (DataSet<Tuple3<K, K, 
EV>> edges,
-                                                                               
                  ExecutionEnvironment context) {
+       public static <K extends Comparable<K> & Serializable, EV extends 
Serializable> Graph<K, NullValue, EV> fromTupleDataSet(
+                       DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment 
context) {
 
                DataSet<Edge<K, EV>> edgeDataSet = edges.map(new 
Tuple3ToEdgeMap<K, EV>());
                return fromDataSet(edgeDataSet, context);
        }
 
        /**
-        * Creates a graph from a DataSet of Tuple objects for edges, vertices 
are induced from the edges and
-        * vertex values are calculated by a mapper function.
-        * Edges with value are created from Tuple3.
-        * Vertices are created automatically and their values are set
-        * by applying the provided map function to the vertex ids.
-        * @param edges a DataSet of vertices.
+        * Creates a graph from a DataSet of Tuple objects for edges, vertices 
are
+        * induced from the edges and vertex values are calculated by a mapper
+        * function. Edges with value are created from Tuple3. Vertices are 
created
+        * automatically and their values are set by applying the provided map
+        * function to the vertex ids.
+        * 
+        * @param edges a DataSet of Tuple3.
         * @param mapper the mapper function.
         * @param context the flink execution environment.
         * @return the newly created graph.
         */
-       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable>
-       Graph<K, VV, EV> fromTupleDataSet (DataSet<Tuple3<K, K, EV>> edges,
-                                                                          
final MapFunction<K, VV> mapper,
-                                                                          
ExecutionEnvironment context) {
+       public static <K extends Comparable<K> & Serializable, VV extends 
Serializable, EV extends Serializable> Graph<K, VV, EV> fromTupleDataSet(
+                       DataSet<Tuple3<K, K, EV>> edges, final MapFunction<K, 
VV> mapper, ExecutionEnvironment context) {
 
                DataSet<Edge<K, EV>> edgeDataSet = edges.map(new 
Tuple3ToEdgeMap<K, EV>());
                return fromDataSet(edgeDataSet, mapper, context);
@@ -288,6 +288,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
 
        /**
         * Function that checks whether a graph's ids are valid
+        * 
         * @return true if the graph's ids are valid, false if not.
         */
        public DataSet<Boolean> validate(GraphValidator<K, VV, EV> validator) {
@@ -311,79 +312,79 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        /**
         * @return the vertex DataSet as Tuple2.
         */
-       @SuppressWarnings({ "unchecked" })
        public DataSet<Tuple2<K, VV>> getVerticesAsTuple2() {
-               return (DataSet<Tuple2<K, VV>>) (DataSet<?>) vertices;
+               return vertices.map(new VertexToTuple2Map<K, VV>());
        }
 
        /**
         * @return the edge DataSet as Tuple3.
         */
-       @SuppressWarnings({ "unchecked" })
        public DataSet<Tuple3<K, K, EV>> getEdgesAsTuple3() {
-               return (DataSet<Tuple3<K, K, EV>>) (DataSet<?>) edges;
+               return edges.map(new EdgeToTuple3Map<K, EV>());
        }
 
-    /**
-     * Apply a function to the attribute of each vertex in the graph.
-     * @param mapper the map function to apply.
-     * @return a new graph
-     */
-    @SuppressWarnings({ "unchecked", "rawtypes" })
+       /**
+        * Apply a function to the attribute of each vertex in the graph.
+        * 
+        * @param mapper the map function to apply.
+        * @return a new graph
+        */
+       @SuppressWarnings({ "unchecked", "rawtypes" })
        public <NV extends Serializable> Graph<K, NV, EV> mapVertices(final 
MapFunction<Vertex<K, VV>, NV> mapper) {
 
-       TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
vertices.getType()).getTypeAt(0);
+               TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
vertices.getType()).getTypeAt(0);
 
-       TypeInformation<NV> valueType = TypeExtractor
-                               .createTypeInfo(MapFunction.class, 
mapper.getClass(), 1, null, null);
+               TypeInformation<NV> valueType = 
TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, 
null);
 
-               TypeInformation<Vertex<K, NV>> returnType = 
(TypeInformation<Vertex<K, NV>>)
-                                       new TupleTypeInfo(Vertex.class, 
keyType, valueType);
+               TypeInformation<Vertex<K, NV>> returnType = 
(TypeInformation<Vertex<K, NV>>) new TupleTypeInfo(
+                               Vertex.class, keyType, valueType);
 
-       DataSet<Vertex<K, NV>> mappedVertices = vertices
-                       .map(new MapFunction<Vertex<K,VV>, Vertex<K, NV>>() {
-                               public Vertex<K, NV> map(Vertex<K, VV> value) 
throws Exception {
-                                       return new Vertex<K, NV>(value.f0, 
mapper.map(value));
-                               }
-                               })
-                       .returns(returnType);
+               DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
+                               new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() 
{
+                                       public Vertex<K, NV> map(Vertex<K, VV> 
value) throws Exception {
+                                               return new Vertex<K, 
NV>(value.f0, mapper.map(value));
+                                       }
+                               }).returns(returnType);
 
-        return new Graph<K, NV, EV>(mappedVertices, this.edges, this.context);
-    }
+               return new Graph<K, NV, EV>(mappedVertices, this.edges, 
this.context);
+       }
 
-    /**
-     * Apply a function to the attribute of each edge in the graph.
-     * @param mapper the map function to apply.
-     * @return a new graph
-     */
-    @SuppressWarnings({ "unchecked", "rawtypes" })
+       /**
+        * Apply a function to the attribute of each edge in the graph.
+        * 
+        * @param mapper the map function to apply.
+        * @return a new graph
+        */
+       @SuppressWarnings({ "unchecked", "rawtypes" })
        public <NV extends Serializable> Graph<K, VV, NV> mapEdges(final 
MapFunction<Edge<K, EV>, NV> mapper) {
 
-       TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
edges.getType()).getTypeAt(0);
+               TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
edges.getType()).getTypeAt(0);
 
-       TypeInformation<NV> valueType = TypeExtractor
-                               .createTypeInfo(MapFunction.class, 
mapper.getClass(), 1, null, null);
+               TypeInformation<NV> valueType = 
TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, 
null);
 
-               TypeInformation<Edge<K, NV>> returnType = 
(TypeInformation<Edge<K, NV>>)
-                               new TupleTypeInfo(Edge.class, keyType, keyType, 
valueType);
+               TypeInformation<Edge<K, NV>> returnType = 
(TypeInformation<Edge<K, NV>>) new TupleTypeInfo(
+                               Edge.class, keyType, keyType, valueType);
 
-       DataSet<Edge<K, NV>> mappedEdges = edges.map(new MapFunction<Edge<K, 
EV>, Edge<K, NV>>() {
-                       public Edge<K, NV> map(Edge<K, EV> value) throws 
Exception {
-                               return new Edge<K, NV>(value.f0, value.f1, 
mapper.map(value));
-                       }
-               })
-               .returns(returnType);
+               DataSet<Edge<K, NV>> mappedEdges = edges.map(
+                               new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
+                                       public Edge<K, NV> map(Edge<K, EV> 
value) throws Exception {
+                                               return new Edge<K, 
NV>(value.f0, value.f1, mapper
+                                                               .map(value));
+                                       }
+                               }).returns(returnType);
 
-        return new Graph<K, VV, NV>(this.vertices, mappedEdges, this.context);
-    }
+               return new Graph<K, VV, NV>(this.vertices, mappedEdges, 
this.context);
+       }
 
        /**
-        * Joins the vertex DataSet of this graph with an input DataSet and 
applies a UDF on the resulted values.
+        * Joins the vertex DataSet of this graph with an input DataSet and 
applies
+        * a UDF on the resulted values.
+        * 
         * @param inputDataSet the DataSet to join with.
         * @param mapper the UDF map function to apply.
         * @return a new graph where the vertex values have been updated.
         */
-       public <T> Graph<K, VV, EV> joinWithVertices(DataSet<Tuple2<K, T>> 
inputDataSet,
+       public <T> Graph<K, VV, EV> joinWithVertices(DataSet<Tuple2<K, T>> 
inputDataSet, 
                        final MapFunction<Tuple2<VV, T>, VV> mapper) {
 
                DataSet<Vertex<K, VV>> resultedVertices = this.getVertices()
@@ -392,75 +393,78 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                return new Graph<K, VV, EV>(resultedVertices, this.edges, 
this.context);
        }
 
-       private static final class ApplyCoGroupToVertexValues<K extends 
Comparable<K> & Serializable,
-                       VV extends Serializable, T>     implements 
CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, Vertex<K, VV>> {
+       private static final class ApplyCoGroupToVertexValues<K extends 
Comparable<K> & Serializable, VV extends Serializable, T>
+                       implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, 
Vertex<K, VV>> {
 
                private MapFunction<Tuple2<VV, T>, VV> mapper;
+
                public ApplyCoGroupToVertexValues(MapFunction<Tuple2<VV, T>, 
VV> mapper) {
                        this.mapper = mapper;
                }
 
                @Override
-               public void coGroup(Iterable<Vertex<K, VV>> vertices, 
Iterable<Tuple2<K, T>> input,
-                                                       Collector<Vertex<K, 
VV>> collector) throws Exception {
+               public void coGroup(Iterable<Vertex<K, VV>> vertices,
+                               Iterable<Tuple2<K, T>> input, 
Collector<Vertex<K, VV>> collector) throws Exception {
 
                        final Iterator<Vertex<K, VV>> vertexIterator = 
vertices.iterator();
                        final Iterator<Tuple2<K, T>> inputIterator = 
input.iterator();
 
                        if (vertexIterator.hasNext()) {
-                               if(inputIterator.hasNext()) {
+                               if (inputIterator.hasNext()) {
                                        final Tuple2<K, T> inputNext = 
inputIterator.next();
 
                                        collector.collect(new Vertex<K, 
VV>(inputNext.f0, mapper
-                                                       .map(new Tuple2<VV, 
T>(vertexIterator.next().f1, inputNext.f1))));
+                                                       .map(new Tuple2<VV, 
T>(vertexIterator.next().f1,
+                                                                       
inputNext.f1))));
                                } else {
                                        
collector.collect(vertexIterator.next());
                                }
-                               
+
                        }
                }
        }
 
        /**
-        * Joins the edge DataSet with an input DataSet on a composite key of 
both source and target
-        * and applies a UDF on the resulted values.
+        * Joins the edge DataSet with an input DataSet on a composite key of 
both
+        * source and target and applies a UDF on the resulted values.
+        * 
         * @param inputDataSet the DataSet to join with.
         * @param mapper the UDF map function to apply.
-        * @param <T>
+        * @param <T> the return type
         * @return a new graph where the edge values have been updated.
         */
-       public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> 
inputDataSet, 
+       public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> 
inputDataSet,
                        final MapFunction<Tuple2<EV, T>, EV> mapper) {
 
                DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
-                               .coGroup(inputDataSet).where(0,1).equalTo(0,1)
+                               .coGroup(inputDataSet).where(0, 1).equalTo(0, 1)
                                .with(new ApplyCoGroupToEdgeValues<K, EV, 
T>(mapper));
                return new Graph<K, VV, EV>(this.vertices, resultedEdges, 
this.context);
        }
 
-       private static final class ApplyCoGroupToEdgeValues<K extends 
Comparable<K> & Serializable,
-                       EV extends Serializable, T>
+       private static final class ApplyCoGroupToEdgeValues<K extends 
Comparable<K> & Serializable, EV extends Serializable, T>
                        implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, 
T>, Edge<K, EV>> {
 
                private MapFunction<Tuple2<EV, T>, EV> mapper;
+
                public ApplyCoGroupToEdgeValues(MapFunction<Tuple2<EV, T>, EV> 
mapper) {
                        this.mapper = mapper;
                }
 
                @Override
-               public void coGroup(Iterable<Edge<K, EV>> edges,
-                                                       Iterable<Tuple3<K, K, 
T>> input,
-                                                       Collector<Edge<K, EV>> 
collector) throws Exception {
+               public void coGroup(Iterable<Edge<K, EV>> edges, 
Iterable<Tuple3<K, K, T>> input,
+                               Collector<Edge<K, EV>> collector) throws 
Exception {
 
                        final Iterator<Edge<K, EV>> edgesIterator = 
edges.iterator();
                        final Iterator<Tuple3<K, K, T>> inputIterator = 
input.iterator();
 
                        if (edgesIterator.hasNext()) {
-                               if(inputIterator.hasNext()) {
+                               if (inputIterator.hasNext()) {
                                        final Tuple3<K, K, T> inputNext = 
inputIterator.next();
 
-                                       collector.collect(new Edge<K, 
EV>(inputNext.f0, inputNext.f1, mapper
-                                                       .map(new Tuple2<EV, 
T>(edgesIterator.next().f2, inputNext.f2))));
+                                       collector.collect(new Edge<K, 
EV>(inputNext.f0,
+                                                       inputNext.f1, 
mapper.map(new Tuple2<EV, T>(
+                                                                       
edgesIterator.next().f2, inputNext.f2))));
                                } else {
                                        collector.collect(edgesIterator.next());
                                }
@@ -469,12 +473,14 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        /**
-        * Joins the edge DataSet with an input DataSet on the source key of 
the edges and the first attribute
-        * of the input DataSet and applies a UDF on the resulted values.
-        * In case the inputDataSet contains the same key more than once, only 
the first value will be considered.
+        * Joins the edge DataSet with an input DataSet on the source key of the
+        * edges and the first attribute of the input DataSet and applies a UDF 
on
+        * the resulted values. In case the inputDataSet contains the same key 
more
+        * than once, only the first value will be considered.
+        * 
         * @param inputDataSet the DataSet to join with.
         * @param mapper the UDF map function to apply.
-        * @param <T>
+        * @param <T> the return type
         * @return a new graph where the edge values have been updated.
         */
        public <T> Graph<K, VV, EV> joinWithEdgesOnSource(DataSet<Tuple2<K, T>> 
inputDataSet,
@@ -487,33 +493,36 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                return new Graph<K, VV, EV>(this.vertices, resultedEdges, 
this.context);
        }
 
-       private static final class 
ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K extends Comparable<K> & 
Serializable,
-                       EV extends Serializable, T>     implements 
CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> {
+       private static final class 
ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K extends Comparable<K> & 
Serializable, EV extends Serializable, T>
+                       implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, 
Edge<K, EV>> {
 
                private MapFunction<Tuple2<EV, T>, EV> mapper;
-               public 
ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(MapFunction<Tuple2<EV, T>, EV> 
mapper) {
+
+               public ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(
+                               MapFunction<Tuple2<EV, T>, EV> mapper) {
                        this.mapper = mapper;
                }
 
                @Override
-               public void coGroup(Iterable<Edge<K, EV>> edges, 
Iterable<Tuple2<K, T>> input,
-                                                       Collector<Edge<K, EV>> 
collector) throws Exception {
+               public void coGroup(Iterable<Edge<K, EV>> edges,
+                               Iterable<Tuple2<K, T>> input, Collector<Edge<K, 
EV>> collector) throws Exception {
 
                        final Iterator<Edge<K, EV>> edgesIterator = 
edges.iterator();
                        final Iterator<Tuple2<K, T>> inputIterator = 
input.iterator();
 
-                       if(inputIterator.hasNext()) {
+                       if (inputIterator.hasNext()) {
                                final Tuple2<K, T> inputNext = 
inputIterator.next();
 
-                               while(edgesIterator.hasNext()) {
+                               while (edgesIterator.hasNext()) {
                                        Edge<K, EV> edgesNext = 
edgesIterator.next();
 
-                                       collector.collect(new Edge<K, 
EV>(edgesNext.f0, edgesNext.f1, mapper
-                                                       .map(new Tuple2<EV, 
T>(edgesNext.f2, inputNext.f1))));
+                                       collector.collect(new Edge<K, 
EV>(edgesNext.f0,
+                                                       edgesNext.f1, 
mapper.map(new Tuple2<EV, T>(
+                                                                       
edgesNext.f2, inputNext.f1))));
                                }
 
                        } else {
-                               while(edgesIterator.hasNext()) {
+                               while (edgesIterator.hasNext()) {
                                        collector.collect(edgesIterator.next());
                                }
                        }
@@ -521,12 +530,14 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        /**
-        * Joins the edge DataSet with an input DataSet on the target key of 
the edges and the first attribute
-        * of the input DataSet and applies a UDF on the resulted values.
-        * Should the inputDataSet contain the same key more than once, only 
the first value will be considered.
+        * Joins the edge DataSet with an input DataSet on the target key of the
+        * edges and the first attribute of the input DataSet and applies a UDF 
on
+        * the resulted values. Should the inputDataSet contain the same key 
more
+        * than once, only the first value will be considered.
+        * 
         * @param inputDataSet the DataSet to join with.
         * @param mapper the UDF map function to apply.
-        * @param <T>
+        * @param <T> the return type
         * @return a new graph where the edge values have been updated.
         */
        public <T> Graph<K, VV, EV> joinWithEdgesOnTarget(DataSet<Tuple2<K, T>> 
inputDataSet,
@@ -540,32 +551,32 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        /**
-        * Apply filtering functions to the graph
-        * and return a sub-graph that satisfies the predicates
-        * for both vertices and edges.
+        * Apply filtering functions to the graph and return a sub-graph that
+        * satisfies the predicates for both vertices and edges.
+        * 
         * @param vertexFilter the filter function for vertices.
         * @param edgeFilter the filter function for edges.
         * @return the resulting sub-graph.
         */
        public Graph<K, VV, EV> subgraph(FilterFunction<Vertex<K, VV>> 
vertexFilter, FilterFunction<Edge<K, EV>> edgeFilter) {
 
-        DataSet<Vertex<K, VV>> filteredVertices = 
this.vertices.filter(vertexFilter);
+               DataSet<Vertex<K, VV>> filteredVertices = 
this.vertices.filter(vertexFilter);
 
-        DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices)
-                       .where(0).equalTo(0)
-                       .with(new ProjectEdge<K, VV, EV>())
-                       .join(filteredVertices).where(1).equalTo(0)
-                       .with(new ProjectEdge<K, VV, EV>());
+               DataSet<Edge<K, EV>> remainingEdges = 
this.edges.join(filteredVertices)
+                               .where(0).equalTo(0).with(new ProjectEdge<K, 
VV, EV>())
+                               .join(filteredVertices).where(1).equalTo(0)
+                               .with(new ProjectEdge<K, VV, EV>());
 
-        DataSet<Edge<K, EV>> filteredEdges = remainingEdges.filter(edgeFilter);
+               DataSet<Edge<K, EV>> filteredEdges = 
remainingEdges.filter(edgeFilter);
 
-        return new Graph<K, VV, EV>(filteredVertices, filteredEdges, 
this.context);
-    }
+               return new Graph<K, VV, EV>(filteredVertices, filteredEdges,
+                               this.context);
+       }
 
        /**
-        * Apply a filtering function to the graph
-        * and return a sub-graph that satisfies the predicates
-        * only for the vertices.
+        * Apply a filtering function to the graph and return a sub-graph that
+        * satisfies the predicates only for the vertices.
+        * 
         * @param vertexFilter the filter function for vertices.
         * @return the resulting sub-graph.
         */
@@ -574,8 +585,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                DataSet<Vertex<K, VV>> filteredVertices = 
this.vertices.filter(vertexFilter);
 
                DataSet<Edge<K, EV>> remainingEdges = 
this.edges.join(filteredVertices)
-                               .where(0).equalTo(0)
-                               .with(new ProjectEdge<K, VV, EV>())
+                               .where(0).equalTo(0).with(new ProjectEdge<K, 
VV, EV>())
                                .join(filteredVertices).where(1).equalTo(0)
                                .with(new ProjectEdge<K, VV, EV>());
 
@@ -583,9 +593,9 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
        }
 
        /**
-        * Apply a filtering function to the graph
-        * and return a sub-graph that satisfies the predicates
-        * only for the edges.
+        * Apply a filtering function to the graph and return a sub-graph that
+        * satisfies the predicates only for the edges.
+        * 
         * @param edgeFilter the filter function for edges.
         * @return the resulting sub-graph.
         */
@@ -595,32 +605,29 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                return new Graph<K, VV, EV>(this.vertices, filteredEdges, 
this.context);
        }
 
-    @ConstantFieldsFirst("0->0;1->1;2->2")
-    private static final class ProjectEdge<K extends Comparable<K> & 
Serializable, 
-       VV extends Serializable, EV extends Serializable> implements 
FlatJoinFunction<Edge<K,EV>, Vertex<K,VV>, 
-               Edge<K,EV>> {
-               public void join(Edge<K, EV> first,
-                               Vertex<K, VV> second, Collector<Edge<K, EV>> 
out) {
+       @ConstantFieldsFirst("0->0;1->1;2->2")
+       private static final class ProjectEdge<K extends Comparable<K> & 
Serializable, VV extends Serializable, EV extends Serializable>
+                       implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, 
Edge<K, EV>> {
+               public void join(Edge<K, EV> first, Vertex<K, VV> second, 
Collector<Edge<K, EV>> out) {
                        out.collect(first);
                }
-    }
-    
-    /**
-     * Return the out-degree of all vertices in the graph
-     * @return A DataSet of Tuple2<vertexId, outDegree>
-     */
+       }
+
+       /**
+        * Return the out-degree of all vertices in the graph
+        * 
+        * @return A DataSet of Tuple2<vertexId, outDegree>
+        */
        public DataSet<Tuple2<K, Long>> outDegrees() {
 
-               return vertices.coGroup(edges).where(0).equalTo(0)
-                               .with(new CountNeighborsCoGroup<K, VV, EV>());
+               return vertices.coGroup(edges).where(0).equalTo(0).with(new 
CountNeighborsCoGroup<K, VV, EV>());
        }
 
-       private static final class CountNeighborsCoGroup<K extends 
Comparable<K> & Serializable, 
-               VV extends Serializable, EV extends Serializable> implements 
CoGroupFunction<Vertex<K, VV>, 
-               Edge<K, EV>, Tuple2<K, Long>> {
+       private static final class CountNeighborsCoGroup<K extends 
Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+                       implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, 
Tuple2<K, Long>> {
                @SuppressWarnings("unused")
-               public void coGroup(Iterable<Vertex<K, VV>> vertex,
-                               Iterable<Edge<K, EV>> outEdges, 
Collector<Tuple2<K, Long>> out) {
+               public void coGroup(Iterable<Vertex<K, VV>> vertex,     
Iterable<Edge<K, EV>> outEdges,
+                               Collector<Tuple2<K, Long>> out) {
                        long count = 0;
                        for (Edge<K, EV> edge : outEdges) {
                                count++;
@@ -628,19 +635,20 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                        out.collect(new Tuple2<K, 
Long>(vertex.iterator().next().f0, count));
                }
        }
-       
+
        /**
         * Return the in-degree of all vertices in the graph
+        * 
         * @return A DataSet of Tuple2<vertexId, inDegree>
         */
        public DataSet<Tuple2<K, Long>> inDegrees() {
 
-               return vertices.coGroup(edges).where(0).equalTo(1)
-                               .with(new CountNeighborsCoGroup<K, VV, EV>());
+               return vertices.coGroup(edges).where(0).equalTo(1).with(new 
CountNeighborsCoGroup<K, VV, EV>());
        }
 
        /**
         * Return the degree of all vertices in the graph
+        * 
         * @return A DataSet of Tuple2<vertexId, degree>
         */
        public DataSet<Tuple2<K, Long>> getDegrees() {
@@ -648,22 +656,26 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        /**
-        * This operation adds all inverse-direction edges
-        * to the graph.
+        * This operation adds all inverse-direction edges to the graph.
+        * 
         * @return the undirected graph.
         */
-       public Graph<K, VV, EV> getUndirected() throws 
UnsupportedOperationException {
-                       DataSet<Edge<K, EV>> undirectedEdges =
-                                       edges.union(edges.map(new 
ReverseEdgesMap<K, EV>()));
-                       return new Graph<K, VV, EV>(vertices, undirectedEdges, 
this.context);
+       public Graph<K, VV, EV> getUndirected() {
+
+               DataSet<Edge<K, EV>> undirectedEdges = 
edges.union(edges.map(new ReverseEdgesMap<K, EV>()));
+               return new Graph<K, VV, EV>(vertices, undirectedEdges, 
this.context);
        }
-       
+
        /**
-        * Compute an aggregate over the edges of each vertex.
-        * The function applied on the edges has access to the vertex value.
-        * @param edgesFunction the function to apply to the neighborhood
-        * @param direction the edge direction (in-, out-, all-)
-        * @param <T> the output type 
+        * Compute an aggregate over the edges of each vertex. The function 
applied
+        * on the edges has access to the vertex value.
+        * 
+        * @param edgesFunction
+        *            the function to apply to the neighborhood
+        * @param direction
+        *            the edge direction (in-, out-, all-)
+        * @param <T>
+        *            the output type
         * @return a dataset of a T
         * @throws IllegalArgumentException
         */
@@ -672,27 +684,29 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
 
                switch (direction) {
                case IN:
-                       return vertices.coGroup(edges).where(0).equalTo(1).with(
-                                       new ApplyCoGroupFunction<K, VV, EV, 
T>(edgesFunction));
+                       return vertices.coGroup(edges).where(0).equalTo(1)
+                                       .with(new ApplyCoGroupFunction<K, VV, 
EV, T>(edgesFunction));
                case OUT:
-                       return vertices.coGroup(edges).where(0).equalTo(0).with(
-                                       new ApplyCoGroupFunction<K, VV, EV, 
T>(edgesFunction));
+                       return vertices.coGroup(edges).where(0).equalTo(0)
+                                       .with(new ApplyCoGroupFunction<K, VV, 
EV, T>(edgesFunction));
                case ALL:
                        return vertices.coGroup(edges.flatMap(new 
EmitOneEdgePerNode<K, VV, EV>()))
-                                       .where(0).equalTo(0)
-                                       .with(new 
ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction));
+                                       .where(0).equalTo(0).with(new 
ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction));
                default:
                        throw new IllegalArgumentException("Illegal edge 
direction");
                }
        }
 
        /**
-        * Compute an aggregate over the edges of each vertex.
-        * The function applied on the edges only has access to the vertex id
-        * (not the vertex value).
-        * @param edgesFunction the function to apply to the neighborhood
-        * @param direction the edge direction (in-, out-, all-)
-        * @param <T> the output type
+        * Compute an aggregate over the edges of each vertex. The function 
applied
+        * on the edges only has access to the vertex id (not the vertex value).
+        * 
+        * @param edgesFunction
+        *            the function to apply to the neighborhood
+        * @param direction
+        *            the edge direction (in-, out-, all-)
+        * @param <T>
+        *            the output type
         * @return a dataset of T
         * @throws IllegalArgumentException
         */
@@ -707,15 +721,15 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                        return edges.map(new ProjectVertexIdMap<K, EV>(0))
                                        .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
                case ALL:
-                       return edges.flatMap(new EmitOneEdgePerNode<K, VV, 
EV>()).groupBy(0)
-                                       .reduceGroup(new 
ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
+                       return edges.flatMap(new EmitOneEdgePerNode<K, VV, 
EV>())
+                                       .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
                default:
                        throw new IllegalArgumentException("Illegal edge 
direction");
                }
        }
 
-       private static final class ProjectVertexIdMap<K extends Comparable<K> & 
Serializable, 
-               EV extends Serializable> implements MapFunction<Edge<K, EV>, 
Tuple2<K, Edge<K, EV>>> {
+       private static final class ProjectVertexIdMap<K extends Comparable<K> & 
Serializable, EV extends Serializable>
+                       implements MapFunction<Edge<K, EV>, Tuple2<K, Edge<K, 
EV>>> {
 
                private int fieldPosition;
 
@@ -725,13 +739,12 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
 
                @SuppressWarnings("unchecked")
                public Tuple2<K, Edge<K, EV>> map(Edge<K, EV> edge) {
-                       return new Tuple2<K, Edge<K, EV>>((K) 
edge.getField(fieldPosition), edge);
+                       return new Tuple2<K, Edge<K, EV>>((K) 
edge.getField(fieldPosition),     edge);
                }
        }
 
-       private static final class ApplyGroupReduceFunction<K extends 
Comparable<K> & Serializable, 
-               EV extends Serializable, T> implements 
GroupReduceFunction<Tuple2<K, Edge<K, EV>>, T>,
-               ResultTypeQueryable<T> {
+       private static final class ApplyGroupReduceFunction<K extends 
Comparable<K> & Serializable, EV extends Serializable, T>
+                       implements GroupReduceFunction<Tuple2<K, Edge<K, EV>>, 
T>,      ResultTypeQueryable<T> {
 
                private EdgesFunction<K, EV, T> function;
 
@@ -739,8 +752,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
                        this.function = fun;
                }
 
-               public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges,
-                               Collector<T> out) throws Exception {
+               public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges, 
Collector<T> out) throws Exception {
                        out.collect(function.iterateEdges(edges));
                }
 
@@ -750,34 +762,31 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                }
        }
 
-       private static final class EmitOneEdgePerNode<K extends Comparable<K> & 
Serializable, 
-               VV extends Serializable, EV extends Serializable> implements 
FlatMapFunction<
-               Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
+       private static final class EmitOneEdgePerNode<K extends Comparable<K> & 
Serializable, VV extends Serializable, EV extends Serializable>
+                       implements FlatMapFunction<Edge<K, EV>, Tuple2<K, 
Edge<K, EV>>> {
                public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, 
Edge<K, EV>>> out) {
                        out.collect(new Tuple2<K, Edge<K, 
EV>>(edge.getSource(), edge));
                        out.collect(new Tuple2<K, Edge<K, 
EV>>(edge.getTarget(), edge));
                }
        }
 
-       private static final class EmitOneEdgeWithNeighborPerNode<K extends 
Comparable<K> & Serializable, 
-               VV extends Serializable, EV extends Serializable> implements 
FlatMapFunction<
-               Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> {
+       private static final class EmitOneEdgeWithNeighborPerNode<K extends 
Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+                       implements FlatMapFunction<Edge<K, EV>, Tuple3<K, K, 
Edge<K, EV>>> {
                public void flatMap(Edge<K, EV> edge, Collector<Tuple3<K, K, 
Edge<K, EV>>> out) {
                        out.collect(new Tuple3<K, K, Edge<K, 
EV>>(edge.getSource(), edge.getTarget(), edge));
                        out.collect(new Tuple3<K, K, Edge<K, 
EV>>(edge.getTarget(), edge.getSource(), edge));
                }
        }
 
-       private static final class ApplyCoGroupFunction<K extends Comparable<K> 
& Serializable, 
-               VV extends Serializable, EV extends Serializable, T> 
-               implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, T>,
-               ResultTypeQueryable<T> {
-               
+       private static final class ApplyCoGroupFunction<K extends Comparable<K> 
& Serializable, VV extends Serializable, EV extends Serializable, T>
+                       implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, 
T>, ResultTypeQueryable<T> {
+
                private EdgesFunctionWithVertexValue<K, VV, EV, T> function;
-               
-               public ApplyCoGroupFunction (EdgesFunctionWithVertexValue<K, 
VV, EV, T> fun) {
+
+               public ApplyCoGroupFunction(EdgesFunctionWithVertexValue<K, VV, 
EV, T> fun) {
                        this.function = fun;
                }
+
                public void coGroup(Iterable<Vertex<K, VV>> vertex,
                                Iterable<Edge<K, EV>> edges, Collector<T> out) 
throws Exception {
                        
out.collect(function.iterateEdges(vertex.iterator().next(), edges));
@@ -785,63 +794,62 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
 
                @Override
                public TypeInformation<T> getProducedType() {
-                       return 
TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, 
function.getClass(), 3, null, null);
+                       return 
TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, 
function.getClass(), 3,
+                                       null, null);
                }
        }
 
-       private static final class ApplyCoGroupFunctionOnAllEdges<K extends 
Comparable<K> & Serializable, 
-               VV extends Serializable, EV extends Serializable, T> 
-               implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, Edge<K, 
EV>>, T>,
-               ResultTypeQueryable<T> {
+       private static final class ApplyCoGroupFunctionOnAllEdges<K extends 
Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, 
T>
+                       implements      CoGroupFunction<Vertex<K, VV>, 
Tuple2<K, Edge<K, EV>>, T>, ResultTypeQueryable<T> {
 
-       private EdgesFunctionWithVertexValue<K, VV, EV, T> function;
+               private EdgesFunctionWithVertexValue<K, VV, EV, T> function;
 
-       public ApplyCoGroupFunctionOnAllEdges (EdgesFunctionWithVertexValue<K, 
VV, EV, T> fun) {
-               this.function = fun;
-       }
+               public 
ApplyCoGroupFunctionOnAllEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> fun) {
+                       this.function = fun;
+               }
 
-       public void coGroup(Iterable<Vertex<K, VV>> vertex, final 
Iterable<Tuple2<K, Edge<K, EV>>> keysWithEdges, 
-                       Collector<T> out) throws Exception {
+               public void coGroup(Iterable<Vertex<K, VV>> vertex,     final 
Iterable<Tuple2<K, Edge<K, EV>>> keysWithEdges,
+                               Collector<T> out) throws Exception {
 
-               final Iterator<Edge<K, EV>> edgesIterator = new 
Iterator<Edge<K,EV>>() {
+                       final Iterator<Edge<K, EV>> edgesIterator = new 
Iterator<Edge<K, EV>>() {
 
-                       final Iterator<Tuple2<K, Edge<K, EV>>> 
keysWithEdgesIterator = keysWithEdges.iterator();
+                               final Iterator<Tuple2<K, Edge<K, EV>>> 
keysWithEdgesIterator = keysWithEdges.iterator();
 
-                       @Override
-                       public boolean hasNext() {
-                               return keysWithEdgesIterator.hasNext();
-                       }
+                               @Override
+                               public boolean hasNext() {
+                                       return keysWithEdgesIterator.hasNext();
+                               }
 
-                       @Override
-                       public Edge<K, EV> next() {
-                               return keysWithEdgesIterator.next().f1;
-                       }
+                               @Override
+                               public Edge<K, EV> next() {
+                                       return keysWithEdgesIterator.next().f1;
+                               }
 
-                       @Override
-                       public void remove() {
-                               keysWithEdgesIterator.remove();
-                       }                       
-               };
+                               @Override
+                               public void remove() {
+                                       keysWithEdgesIterator.remove();
+                               }
+                       };
 
-               Iterable<Edge<K, EV>> edgesIterable = new 
Iterable<Edge<K,EV>>() {
-                       public Iterator<Edge<K, EV>> iterator() {
-                               return edgesIterator;
-                       }
-               };
+                       Iterable<Edge<K, EV>> edgesIterable = new 
Iterable<Edge<K, EV>>() {
+                               public Iterator<Edge<K, EV>> iterator() {
+                                       return edgesIterator;
+                               }
+                       };
 
-               out.collect(function.iterateEdges(vertex.iterator().next(), 
edgesIterable));
-       }
+                       
out.collect(function.iterateEdges(vertex.iterator().next(),     edgesIterable));
+               }
 
-       @Override
-       public TypeInformation<T> getProducedType() {
-               return 
TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, 
function.getClass(), 3, null, null);
+               @Override
+               public TypeInformation<T> getProducedType() {
+                       return 
TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, 
function.getClass(), 3,
+                                       null, null);
+               }
        }
-}
 
        @ConstantFields("0->1;1->0;2->2")
-       private static final class ReverseEdgesMap<K extends Comparable<K> & 
Serializable, 
-               EV extends Serializable> implements MapFunction<Edge<K, EV>,
-               Edge<K, EV>> {
+       private static final class ReverseEdgesMap<K extends Comparable<K> & 
Serializable, EV extends Serializable>
+                       implements MapFunction<Edge<K, EV>, Edge<K, EV>> {
 
                public Edge<K, EV> map(Edge<K, EV> value) {
                        return new Edge<K, EV>(value.f1, value.f0, value.f2);
@@ -850,6 +858,7 @@ public class Graph<K extends Comparable<K> & Serializable, 
VV extends Serializab
 
        /**
         * Reverse the direction of the edges in the graph
+        * 
         * @return a new graph with all edges reversed
         * @throws UnsupportedOperationException
         */
@@ -861,273 +870,278 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        /**
         * @return Singleton DataSet containing the vertex count
         */
-       public DataSet<Integer> numberOfVertices () {
-        return GraphUtils.count(vertices, context);
-    }
+       public DataSet<Integer> numberOfVertices() {
+               return GraphUtils.count(vertices, context);
+       }
 
        /**
         * @return Singleton DataSet containing the edge count
         */
-       public DataSet<Integer> numberOfEdges () {
-        return GraphUtils.count(edges, context);
-    }
-
-    /**
-     * @return The IDs of the vertices as DataSet
-     */
-    public DataSet<K> getVertexIds () {
-        return vertices.map(new ExtractVertexIDMapper<K, VV>());
-    }
-    
-    private static final class ExtractVertexIDMapper<K extends Comparable<K> & 
Serializable, 
-       VV extends Serializable> implements MapFunction<Vertex<K, VV>, K> {
-            @Override
-            public K map(Vertex<K, VV> vertex) {
-                return vertex.f0;
-            }
-    }
-
-    /**
-     * @return The IDs of the edges as DataSet
-     */
-    public DataSet<Tuple2<K, K>> getEdgeIds () {
-        return edges.map(new ExtractEdgeIDsMapper<K, EV>());
-    }
-    
-    private static final class ExtractEdgeIDsMapper<K extends Comparable<K> & 
Serializable, 
-       EV extends Serializable> implements MapFunction<Edge<K, EV>, Tuple2<K, 
K>> {
-            @Override
-            public Tuple2<K, K> map(Edge<K, EV> edge) throws Exception {
-                return new Tuple2<K,K>(edge.f0, edge.f1);
-            }
-    }
+       public DataSet<Integer> numberOfEdges() {
+               return GraphUtils.count(edges, context);
+       }
+
+       /**
+        * @return The IDs of the vertices as DataSet
+        */
+       public DataSet<K> getVertexIds() {
+               return vertices.map(new ExtractVertexIDMapper<K, VV>());
+       }
+
+       private static final class ExtractVertexIDMapper<K extends 
Comparable<K> & Serializable, VV extends Serializable>
+                       implements MapFunction<Vertex<K, VV>, K> {
+               @Override
+               public K map(Vertex<K, VV> vertex) {
+                       return vertex.f0;
+               }
+       }
+
+       /**
+        * @return The IDs of the edges as DataSet
+        */
+       public DataSet<Tuple2<K, K>> getEdgeIds() {
+               return edges.map(new ExtractEdgeIDsMapper<K, EV>());
+       }
+
+       private static final class ExtractEdgeIDsMapper<K extends Comparable<K> 
& Serializable, EV extends Serializable>
+                       implements MapFunction<Edge<K, EV>, Tuple2<K, K>> {
+               @Override
+               public Tuple2<K, K> map(Edge<K, EV> edge) throws Exception {
+                       return new Tuple2<K, K>(edge.f0, edge.f1);
+               }
+       }
 
        /**
         * Checks the weak connectivity of a graph.
-        * @param maxIterations the maximum number of iterations for the inner 
delta iteration
+        * 
+        * @param maxIterations
+        *            the maximum number of iterations for the inner delta 
iteration
         * @return true if the graph is weakly connected.
         */
-       public DataSet<Boolean> isWeaklyConnected (int maxIterations) {
+       public DataSet<Boolean> isWeaklyConnected(int maxIterations) {
                // first, convert to an undirected graph
                Graph<K, VV, EV> graph = this.getUndirected();
 
                DataSet<K> vertexIds = graph.getVertexIds();
-        DataSet<Tuple2<K,K>> verticesWithInitialIds = vertexIds
-                .map(new DuplicateVertexIDMapper<K>());
-
-        DataSet<Tuple2<K,K>> edgeIds = graph.getEdgeIds();
-
-        DeltaIteration<Tuple2<K,K>, Tuple2<K,K>> iteration = 
verticesWithInitialIds
-                .iterateDelta(verticesWithInitialIds, maxIterations, 0);
-
-        DataSet<Tuple2<K, K>> changes = iteration.getWorkset()
-                .join(edgeIds, JoinHint.REPARTITION_SORT_MERGE)
-                .where(0).equalTo(0)
-                .with(new FindNeighborsJoin<K>())
-                .groupBy(0)
-                .aggregate(Aggregations.MIN, 1)
-                .join(iteration.getSolutionSet(), 
JoinHint.REPARTITION_SORT_MERGE)
-                .where(0).equalTo(0)
-                .with(new VertexWithNewComponentJoin<K>());
-
-        DataSet<Tuple2<K, K>> components = iteration.closeWith(changes, 
changes);
-        DataSet<Boolean> result = 
GraphUtils.count(components.groupBy(1).reduceGroup(
-                       new EmitFirstReducer<K>()), context).map(new 
CheckIfOneComponentMapper());      
-        return result;
-    }
-       
+               DataSet<Tuple2<K, K>> verticesWithInitialIds = vertexIds
+                               .map(new DuplicateVertexIDMapper<K>());
+
+               DataSet<Tuple2<K, K>> edgeIds = graph.getEdgeIds();
+
+               DeltaIteration<Tuple2<K, K>, Tuple2<K, K>> iteration = 
verticesWithInitialIds
+                               .iterateDelta(verticesWithInitialIds, 
maxIterations, 0);
+
+               DataSet<Tuple2<K, K>> changes = iteration.getWorkset()
+                               .join(edgeIds, JoinHint.REPARTITION_SORT_MERGE)
+                               .where(0).equalTo(0).with(new 
FindNeighborsJoin<K>())
+                               .groupBy(0).aggregate(Aggregations.MIN, 1)
+                               .join(iteration.getSolutionSet(), 
JoinHint.REPARTITION_SORT_MERGE).where(0).equalTo(0)
+                               .with(new VertexWithNewComponentJoin<K>());
+
+               DataSet<Tuple2<K, K>> components = iteration.closeWith(changes, 
changes);
+               DataSet<Boolean> result = 
GraphUtils.count(components.groupBy(1).reduceGroup(new EmitFirstReducer<K>()),
+                               context).map(new CheckIfOneComponentMapper());
+               return result;
+       }
+
        private static final class DuplicateVertexIDMapper<K> implements 
MapFunction<K, Tuple2<K, K>> {
-            @Override
-            public Tuple2<K, K> map(K k) {
-                return new Tuple2<K, K>(k, k);
-            }
-       }
-       
-       private static final class FindNeighborsJoin<K> implements 
JoinFunction<Tuple2<K, K>, Tuple2<K, K>, 
-               Tuple2<K, K>> {
-        @Override
-        public Tuple2<K, K> join(Tuple2<K, K> vertexWithComponent, Tuple2<K, 
K> edge) {
-            return new Tuple2<K,K>(edge.f1, vertexWithComponent.f1);
-        }
-       }
-
-       private static final class VertexWithNewComponentJoin<K extends 
Comparable<K>> 
-               implements FlatJoinFunction<Tuple2<K, K>, Tuple2<K, K>, 
Tuple2<K, K>> {
-        @Override
-        public void join(Tuple2<K, K> candidate, Tuple2<K, K> old, 
Collector<Tuple2<K, K>> out) {
-            if (candidate.f1.compareTo(old.f1) < 0) {
-                out.collect(candidate);
-            }
-        }
-       }
-       
-       private static final class EmitFirstReducer<K> implements 
-               GroupReduceFunction<Tuple2<K, K>, Tuple2<K, K>> {
+               @Override
+               public Tuple2<K, K> map(K k) {
+                       return new Tuple2<K, K>(k, k);
+               }
+       }
+
+       private static final class FindNeighborsJoin<K> implements 
JoinFunction<Tuple2<K, K>, Tuple2<K, K>, Tuple2<K, K>> {
+               @Override
+               public Tuple2<K, K> join(Tuple2<K, K> vertexWithComponent, 
Tuple2<K, K> edge) {
+                       return new Tuple2<K, K>(edge.f1, 
vertexWithComponent.f1);
+               }
+       }
+
+       private static final class VertexWithNewComponentJoin<K extends 
Comparable<K>>
+                       implements FlatJoinFunction<Tuple2<K, K>, Tuple2<K, K>, 
Tuple2<K, K>> {
+               @Override
+               public void join(Tuple2<K, K> candidate, Tuple2<K, K> old, 
Collector<Tuple2<K, K>> out) {
+                       if (candidate.f1.compareTo(old.f1) < 0) {
+                               out.collect(candidate);
+                       }
+               }
+       }
+
+       private static final class EmitFirstReducer<K> implements 
GroupReduceFunction<Tuple2<K, K>, Tuple2<K, K>> {
                public void reduce(Iterable<Tuple2<K, K>> values, 
Collector<Tuple2<K, K>> out) {
-                       out.collect(values.iterator().next());                  
+                       out.collect(values.iterator().next());
                }
        }
-       
-       private static final class CheckIfOneComponentMapper implements 
MapFunction<Integer, Boolean> {
-        @Override
-        public Boolean map(Integer n) {
-               return (n == 1);
-        }
+
+       private static final class CheckIfOneComponentMapper implements 
MapFunction<Integer, Boolean> {
+               @Override
+               public Boolean map(Integer n) {
+                       return (n == 1);
+               }
        }
-       
+
        /**
-        * Adds the input vertex and edges to the graph.
-        * If the vertex already exists in the graph, it will not be added 
again,
-        * but the given edges will.
+        * Adds the input vertex and edges to the graph. If the vertex already
+        * exists in the graph, it will not be added again, but the given edges
+        * will.
+        * 
         * @param vertex the vertex to add to the graph
         * @param edges a list of edges to add to the grap
-        * @return the new graph containing the existing and newly added 
vertices and edges
+        * @return the new graph containing the existing and newly added 
vertices
+        *         and edges
+        */
+       @SuppressWarnings("unchecked")
+       public Graph<K, VV, EV> addVertex(final Vertex<K, VV> vertex, 
List<Edge<K, EV>> edges) {
+               DataSet<Vertex<K, VV>> newVertex = 
this.context.fromElements(vertex);
+
+               // Take care of empty edge set
+               if (edges.isEmpty()) {
+                       return new Graph<K, VV, 
EV>(this.vertices.union(newVertex)
+                                       .distinct(), this.edges, this.context);
+               }
+
+               // Add the vertex and its edges
+               DataSet<Vertex<K, VV>> newVertices = 
this.vertices.union(newVertex).distinct();
+               DataSet<Edge<K, EV>> newEdges = 
this.edges.union(context.fromCollection(edges));
+
+               return new Graph<K, VV, EV>(newVertices, newEdges, 
this.context);
+       }
+
+       /**
+        * Adds the given edge to the graph. If the source and target vertices 
do
+        * not exist in the graph, they will also be added.
+        * 
+        * @param source the source vertex of the edge
+        * @param target the target vertex of the edge
+        * @param edgeValue the edge value
+        * @return the new graph containing the existing vertices and edges 
plus the
+        *         newly added edge
         */
        @SuppressWarnings("unchecked")
-       public Graph<K, VV, EV> addVertex (final Vertex<K,VV> vertex, 
List<Edge<K, EV>> edges) {
-       DataSet<Vertex<K, VV>> newVertex = this.context.fromElements(vertex);
-
-       // Take care of empty edge set
-       if (edges.isEmpty()) {
-               return new Graph<K, VV, 
EV>(this.vertices.union(newVertex).distinct(), this.edges, this.context);
-       }
-
-       // Add the vertex and its edges
-       DataSet<Vertex<K, VV>> newVertices = 
this.vertices.union(newVertex).distinct();
-       DataSet<Edge<K, EV>> newEdges = 
this.edges.union(context.fromCollection(edges));
-
-       return new Graph<K, VV, EV>(newVertices, newEdges, this.context);
-    }
-
-    /**
-     * Adds the given edge to the graph.
-     * If the source and target vertices do not exist in the graph,
-     * they will also be added.
-     * @param source the source vertex of the edge
-     * @param target the target vertex of the edge
-     * @param edgeValue the edge value
-     * @return the new graph containing the existing vertices and edges plus 
the newly added edge
-     */
-    public Graph<K, VV, EV> addEdge (Vertex<K,VV> source, Vertex<K,VV> target, 
EV edgeValue) {
-       Graph<K,VV,EV> partialGraph = fromCollection(
-                               Arrays.asList(source, target),
+       public Graph<K, VV, EV> addEdge(Vertex<K, VV> source, Vertex<K, VV> 
target, EV edgeValue) {
+               Graph<K, VV, EV> partialGraph = 
fromCollection(Arrays.asList(source, target),
                                Arrays.asList(new Edge<K, EV>(source.f0, 
target.f0, edgeValue)),
-                               this.context
-               );
-        return this.union(partialGraph);
-    }
+                               this.context);
+               return this.union(partialGraph);
+       }
 
        /**
         * Removes the given vertex and its edges from the graph.
+        * 
         * @param vertex the vertex to remove
-        * @return the new graph containing the existing vertices and edges 
without the removed vertex and its edges
+        * @return the new graph containing the existing vertices and edges 
without
+        *         the removed vertex and its edges
         */
-       public Graph<K, VV, EV> removeVertex (Vertex<K,VV> vertex) {
-
-               DataSet<Vertex<K, VV>> newVertices = getVertices().filter(
-                               new RemoveVertexFilter<K, VV>(vertex));
-               DataSet<Edge<K, EV>> newEdges = getEdges().filter(
-                               new VertexRemovalEdgeFilter<K, VV, EV>(vertex));
-        return new Graph<K, VV, EV>(newVertices, newEdges, this.context);
-    }
-    
-    private static final class RemoveVertexFilter<K extends Comparable<K> & 
Serializable, 
-               VV extends Serializable> implements FilterFunction<Vertex<K, 
VV>> {
-
-       private Vertex<K, VV> vertexToRemove;
-
-        public RemoveVertexFilter(Vertex<K, VV> vertex) {
-               vertexToRemove = vertex;
+       public Graph<K, VV, EV> removeVertex(Vertex<K, VV> vertex) {
+
+               DataSet<Vertex<K, VV>> newVertices = getVertices().filter(new 
RemoveVertexFilter<K, VV>(vertex));
+               DataSet<Edge<K, EV>> newEdges = getEdges().filter(new 
VertexRemovalEdgeFilter<K, VV, EV>(vertex));
+               return new Graph<K, VV, EV>(newVertices, newEdges, 
this.context);
+       }
+
+       private static final class RemoveVertexFilter<K extends Comparable<K> & 
Serializable, VV extends Serializable>
+                       implements FilterFunction<Vertex<K, VV>> {
+
+               private Vertex<K, VV> vertexToRemove;
+
+               public RemoveVertexFilter(Vertex<K, VV> vertex) {
+                       vertexToRemove = vertex;
                }
 
-        @Override
-        public boolean filter(Vertex<K, VV> vertex) throws Exception {
-               return !vertex.f0.equals(vertexToRemove.f0);
-        }
-    }
-    
-    private static final class VertexRemovalEdgeFilter<K extends Comparable<K> 
& Serializable, 
-       VV extends Serializable, EV extends Serializable> implements 
FilterFunction<Edge<K, EV>> {
+               @Override
+               public boolean filter(Vertex<K, VV> vertex) throws Exception {
+                       return !vertex.f0.equals(vertexToRemove.f0);
+               }
+       }
+
+       private static final class VertexRemovalEdgeFilter<K extends 
Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+                       implements FilterFunction<Edge<K, EV>> {
 
-       private Vertex<K, VV> vertexToRemove;
+               private Vertex<K, VV> vertexToRemove;
 
-        public VertexRemovalEdgeFilter(Vertex<K, VV> vertex) {
+               public VertexRemovalEdgeFilter(Vertex<K, VV> vertex) {
                        vertexToRemove = vertex;
                }
 
-        @Override
-        public boolean filter(Edge<K, EV> edge) throws Exception {
-               
-               if (edge.f0.equals(vertexToRemove.f0)) {
-                return false;
-            }
-            if (edge.f1.equals(vertexToRemove.f0)) {
-                return false;
-            }
-            return true;
-        }
-    }
-
-    /**
-     * Removes all edges that match the given edge from the graph.
-     * @param edge the edge to remove
-     * @return the new graph containing the existing vertices and edges 
without the removed edges
-     */
-    public Graph<K, VV, EV> removeEdge (Edge<K, EV> edge) {
-               DataSet<Edge<K, EV>> newEdges = getEdges().filter(
-                               new EdgeRemovalEdgeFilter<K, EV>(edge));
-        return new Graph<K, VV, EV>(this.vertices, newEdges, this.context);
-    }
-    
-    private static final class EdgeRemovalEdgeFilter<K extends Comparable<K> & 
Serializable, 
-               EV extends Serializable> implements FilterFunction<Edge<K, EV>> 
{
-       private Edge<K, EV> edgeToRemove;
-
-        public EdgeRemovalEdgeFilter(Edge<K, EV> edge) {
+               @Override
+               public boolean filter(Edge<K, EV> edge) throws Exception {
+
+                       if (edge.f0.equals(vertexToRemove.f0)) {
+                               return false;
+                       }
+                       if (edge.f1.equals(vertexToRemove.f0)) {
+                               return false;
+                       }
+                       return true;
+               }
+       }
+
+       /**
+        * Removes all edges that match the given edge from the graph.
+        * 
+        * @param edge the edge to remove
+        * @return the new graph containing the existing vertices and edges 
without
+        *         the removed edges
+        */
+       public Graph<K, VV, EV> removeEdge(Edge<K, EV> edge) {
+               DataSet<Edge<K, EV>> newEdges = getEdges().filter(new 
EdgeRemovalEdgeFilter<K, EV>(edge));
+               return new Graph<K, VV, EV>(this.vertices, newEdges, 
this.context);
+       }
+
+       private static final class EdgeRemovalEdgeFilter<K extends 
Comparable<K> & Serializable, EV extends Serializable>
+                       implements FilterFunction<Edge<K, EV>> {
+               private Edge<K, EV> edgeToRemove;
+
+               public EdgeRemovalEdgeFilter(Edge<K, EV> edge) {
                        edgeToRemove = edge;
                }
 
-        @Override
-        public boolean filter(Edge<K, EV> edge) {
-               return (!(edge.f0.equals(edgeToRemove.f0) 
-                               && edge.f1.equals(edgeToRemove.f1)));
-        }
-    }
-
-    /**
-     * Performs union on the vertices and edges sets of the input graphs
-     * removing duplicate vertices but maintaining duplicate edges.
-     * @param graph the graph to perform union with
-     * @return a new graph
-     */
-    public Graph<K, VV, EV> union (Graph<K, VV, EV> graph) {
-        DataSet<Vertex<K,VV>> unionedVertices = 
graph.getVertices().union(this.getVertices()).distinct();
-        DataSet<Edge<K,EV>> unionedEdges = 
graph.getEdges().union(this.getEdges());
-        return new Graph<K,VV,EV>(unionedVertices, unionedEdges, this.context);
-    }
+               @Override
+               public boolean filter(Edge<K, EV> edge) {
+                       return (!(edge.f0.equals(edgeToRemove.f0) && edge.f1
+                                       .equals(edgeToRemove.f1)));
+               }
+       }
+
+       /**
+        * Performs union on the vertices and edges sets of the input graphs
+        * removing duplicate vertices but maintaining duplicate edges.
+        * 
+        * @param graph the graph to perform union with
+        * @return a new graph
+        */
+       public Graph<K, VV, EV> union(Graph<K, VV, EV> graph) {
+
+               DataSet<Vertex<K, VV>> unionedVertices = 
graph.getVertices().union(this.getVertices()).distinct();
+               DataSet<Edge<K, EV>> unionedEdges = 
graph.getEdges().union(this.getEdges());
+               return new Graph<K, VV, EV>(unionedVertices, unionedEdges, 
this.context);
+       }
 
        /**
         * Runs a Vertex-Centric iteration on the graph.
+        * 
         * @param vertexUpdateFunction the vertex update function
         * @param messagingFunction the messaging function
         * @param maximumNumberOfIterations maximum number of iterations to 
perform
         * @return
         */
-       public <M>Graph<K, VV, EV> 
runVertexCentricIteration(VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
-               MessagingFunction<K, VV, M, EV> messagingFunction, int 
maximumNumberOfIterations) {
-       DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(
-                            VertexCentricIteration.withEdges(edges,
-                                               vertexUpdateFunction, 
messagingFunction, maximumNumberOfIterations));
+       public <M> Graph<K, VV, EV> 
runVertexCentricIteration(VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
+                       MessagingFunction<K, VV, M, EV> messagingFunction,      
int maximumNumberOfIterations) {
+               DataSet<Vertex<K, VV>> newVertices = 
vertices.runOperation(VertexCentricIteration
+                               .withEdges(edges, vertexUpdateFunction, 
messagingFunction, maximumNumberOfIterations));
                return new Graph<K, VV, EV>(newVertices, this.edges, 
this.context);
-    }
+       }
 
-       public Graph<K, VV, EV> run (GraphAlgorithm<K, VV, EV> algorithm) {
+       public Graph<K, VV, EV> run(GraphAlgorithm<K, VV, EV> algorithm) {
                return algorithm.run(this);
        }
 
        /**
-        * Compute an aggregate over the neighbors (edges and vertices) of each 
vertex.
-        * The function applied on the neighbors has access to the vertex value.
+        * Compute an aggregate over the neighbors (edges and vertices) of each
+        * vertex. The function applied on the neighbors has access to the 
vertex
+        * value.
+        * 
         * @param neighborsFunction the function to apply to the neighborhood
         * @param direction the edge direction (in-, out-, all-)
         * @param <T> the output type
@@ -1139,23 +1153,27 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                switch (direction) {
                case IN:
                        // create <edge-sourceVertex> pairs
-                       DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> 
edgesWithSources = edges.join(this.vertices)
-                               .where(0).equalTo(0);
-                       return 
vertices.coGroup(edgesWithSources).where(0).equalTo("f0.f1").with(
-                                       new ApplyNeighborCoGroupFunction<K, VV, 
EV, T>(neighborsFunction));
+                       DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> 
edgesWithSources = edges
+                                       
.join(this.vertices).where(0).equalTo(0);
+                       return vertices.coGroup(edgesWithSources)
+                                       .where(0).equalTo("f0.f1")
+                                       .with(new 
ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction));
                case OUT:
                        // create <edge-targetVertex> pairs
-                       DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> 
edgesWithTargets = edges.join(this.vertices)
-                               .where(1).equalTo(0);
-                       return 
vertices.coGroup(edgesWithTargets).where(0).equalTo("f0.f0").with(
-                                       new ApplyNeighborCoGroupFunction<K, VV, 
EV, T>(neighborsFunction));
+                       DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> 
edgesWithTargets = edges
+                                       
.join(this.vertices).where(1).equalTo(0);
+                       return vertices.coGroup(edgesWithTargets)
+                                       .where(0).equalTo("f0.f0")
+                                       .with(new 
ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction));
                case ALL:
                        // create <edge-sourceOrTargetVertex> pairs
-                       DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> 
edgesWithNeighbors = edges.flatMap(
-                                       new EmitOneEdgeWithNeighborPerNode<K, 
VV, EV>()).join(this.vertices)
-                                       .where(1).equalTo(0).with(new 
ProjectEdgeWithNeighbor<K, VV, EV>());
+                       DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> 
edgesWithNeighbors = edges
+                                       .flatMap(new 
EmitOneEdgeWithNeighborPerNode<K, VV, EV>())
+                                       .join(this.vertices).where(1).equalTo(0)
+                                       .with(new ProjectEdgeWithNeighbor<K, 
VV, EV>());
 
-                       return 
vertices.coGroup(edgesWithNeighbors).where(0).equalTo(0)
+                       return vertices.coGroup(edgesWithNeighbors)
+                                       .where(0).equalTo(0)
                                        .with(new 
ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction));
                default:
                        throw new IllegalArgumentException("Illegal edge 
direction");
@@ -1163,9 +1181,10 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
        }
 
        /**
-        * Compute an aggregate over the neighbors (edges and vertices) of each 
vertex.
-        * The function applied on the neighbors only has access to the vertex 
id
-        * (not the vertex value).
+        * Compute an aggregate over the neighbors (edges and vertices) of each
+        * vertex. The function applied on the neighbors only has access to the
+        * vertex id (not the vertex value).
+        * 
         * @param neighborsFunction the function to apply to the neighborhood
         * @param direction the edge direction (in-, out-, all-)
         * @param <T> the output type
@@ -1177,21 +1196,24 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                switch (direction) {
                case IN:
                        // create <edge-sourceVertex> pairs
-                       DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> 
edgesWithSources = edges.join(this.vertices)
-                               .where(0).equalTo(0).with(new 
ProjectVertexIdJoin<K, VV, EV>(1));
+                       DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> 
edgesWithSources = edges
+                                       .join(this.vertices).where(0).equalTo(0)
+                                       .with(new ProjectVertexIdJoin<K, VV, 
EV>(1));
                        return edgesWithSources.groupBy(0).reduceGroup(
                                        new ApplyNeighborGroupReduceFunction<K, 
VV, EV, T>(neighborsFunction));
                case OUT:
                        // create <edge-targetVertex> pairs
-                       DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> 
edgesWithTargets = edges.join(this.vertices)
-                       .where(1).equalTo(0).with(new ProjectVertexIdJoin<K, 
VV, EV>(0));
-               return edgesWithTargets.groupBy(0).reduceGroup(
-                               new ApplyNeighborGroupReduceFunction<K, VV, EV, 
T>(neighborsFunction));
+                       DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> 
edgesWithTargets = edges
+                                       .join(this.vertices).where(1).equalTo(0)
+                                       .with(new ProjectVertexIdJoin<K, VV, 
EV>(0));
+                       return edgesWithTargets.groupBy(0).reduceGroup(
+                                       new ApplyNeighborGroupReduceFunction<K, 
VV, EV, T>(neighborsFunction));
                case ALL:
                        // create <edge-sourceOrTargetVertex> pairs
-                       DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> 
edgesWithNeighbors = edges.flatMap(
-                                       new EmitOneEdgeWithNeighborPerNode<K, 
VV, EV>()).join(this.vertices)
-                                       .where(1).equalTo(0).with(new 
ProjectEdgeWithNeighbor<K, VV, EV>());
+                       DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> 
edgesWithNeighbors = edges
+                                       .flatMap(new 
EmitOneEdgeWithNeighborPerNode<K, VV, EV>())
+                                       .join(this.vertices).where(1).equalTo(0)
+                                       .with(new ProjectEdgeWithNeighbor<K, 
VV, EV>());
 
                        return edgesWithNeighbors.groupBy(0).reduceGroup(
                                        new ApplyNeighborGroupReduceFunction<K, 
VV, EV, T>(neighborsFunction));
@@ -1200,125 +1222,118 @@ public class Graph<K extends Comparable<K> & 
Serializable, VV extends Serializab
                }
        }
 
-       private static final class ApplyNeighborGroupReduceFunction<K extends 
Comparable<K> & Serializable, 
-               VV extends Serializable, EV extends Serializable, T> implements 
GroupReduceFunction<
-               Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>,      
ResultTypeQueryable<T> {
-       
+       private static final class ApplyNeighborGroupReduceFunction<K extends 
Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, 
T>
+                       implements GroupReduceFunction<Tuple3<K, Edge<K, EV>, 
Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
+
                private NeighborsFunction<K, VV, EV, T> function;
-       
+
                public ApplyNeighborGroupReduceFunction(NeighborsFunction<K, 
VV, EV, T> fun) {
                        this.function = fun;
                }
-       
-               public void reduce(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, 
VV>>> edges,
-                               Collector<T> out) throws Exception {
+
+               public void reduce(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, 
VV>>> edges, Collector<T> out) throws Exception {
                        out.collect(function.iterateNeighbors(edges));
-                       
+
                }
 
                @Override
                public TypeInformation<T> getProducedType() {
                        return 
TypeExtractor.createTypeInfo(NeighborsFunction.class, function.getClass(), 3, 
null, null);
-               }       
+               }
        }
 
-       private static final class ProjectVertexIdJoin<K extends Comparable<K> 
& Serializable, 
-               VV extends Serializable, EV extends Serializable> implements 
FlatJoinFunction<Edge<K, EV>, 
-               Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
+       private static final class ProjectVertexIdJoin<K extends Comparable<K> 
& Serializable, VV extends Serializable, EV extends Serializable>
+                       implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, 
Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
 
                private int fieldPosition;
 
                public ProjectVertexIdJoin(int position) {
                        this.fieldPosition = position;
                }
+
                @SuppressWarnings("unchecked")
-               public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex,
+               public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex, 
                                Collector<Tuple3<K, Edge<K, EV>, Vertex<K, 
VV>>> out) {
-                       out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>(
-                                       (K)edge.getField(fieldPosition), edge, 
otherVertex));
+                       out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, 
VV>>((K) edge.getField(fieldPosition), edge, otherVertex));
                }
        }
 
-       private static final class ProjectEdgeWithNeighbor<K extends 
Comparable<K> & Serializable, 
-               VV extends Serializable, EV extends Serializable> implements 
-               FlatJoinFunction<Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, 
Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
+       private static final class ProjectEdgeWithNeighbor<K extends 
Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+                       implements      FlatJoinFunction<Tuple3<K, K, Edge<K, 
EV>>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
                public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, 
Vertex<K, VV> neighbor,
                                Collector<Tuple3<K, Edge<K, EV>, Vertex<K, 
VV>>> out) {
-                       out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, 
VV>>(keysWithEdge.f0, 
-                                       keysWithEdge.f2, neighbor));
+
+                       out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, 
VV>>(keysWithEdge.f0, keysWithEdge.f2, neighbor));
                }
        }
 
-       private static final class ApplyNeighborCoGroupFunction<K extends 
Comparable<K> & Serializable, 
-               VV extends Serializable, EV extends Serializable, T> 
-               implements CoGroupFunction<Vertex<K, VV>, Tuple2<Edge<K, EV>, 
Vertex<K, VV>>, T>,
-               ResultTypeQueryable<T> {
-       
+       private static final class ApplyNeighborCoGroupFunction<K extends 
Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, 
T>
+                       implements CoGroupFunction<Vertex<K, VV>, 
Tuple2<Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
+
                private NeighborsFunctionWithVertexValue<K, VV, EV, T> function;
-               
-               public ApplyNeighborCoGroupFunction 
(NeighborsFunctionWithVertexValue<K, VV, EV, T> fun) {
+
+               public 
ApplyNeighborCoGroupFunction(NeighborsFunctionWithVertexValue<K, VV, EV, T> 
fun) {
                        this.function = fun;
                }
-               public void coGroup(Iterable<Vertex<K, VV>> vertex,
-                               Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> 
neighbors, Collector<T> out) throws Exception {
-                       
out.collect(function.iterateNeighbors(vertex.iterator().next(), neighbors));
+
+               public void coGroup(Iterable<Vertex<K, VV>> vertex, 
Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors,
+                               Collector<T> out) throws Exception {
+                       
out.collect(function.iterateNeighbors(vertex.iterator().next(), neighbors));
                }
+
                @Override
                public TypeInformation<T> getProducedType() {
-                       return 
TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, 
-                                                       function.getClass(), 3, 
null, null);
+                       return 
TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class,     
function.getClass(), 3, null, null);
                }
        }
 
-       private static final class ApplyCoGroupFunctionOnAllNeighbors<K extends 
Comparable<K> & Serializable, 
-               VV extends Serializable, EV extends Serializable, T> 
-               implements CoGroupFunction<Vertex<K, VV>, Tuple3<K, Edge<K, 
EV>, Vertex<K, VV>>, T>,
-               ResultTypeQueryable<T> {
+       private static final class ApplyCoGroupFunctionOnAllNeighbors<K extends 
Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, 
T>
+                       implements CoGroupFunction<Vertex<K, VV>, Tuple3<K, 
Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
 
                private NeighborsFunctionWithVertexValue<K, VV, EV, T> function;
-               
-               public ApplyCoGroupFunctionOnAllNeighbors 
(NeighborsFunctionWithVertexValue<K, VV, EV, T> fun) {
+
+               public 
ApplyCoGroupFunctionOnAllNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, 
T> fun) {
                        this.function = fun;
                }
 
-               public void coGroup(Iterable<Vertex<K, VV>> vertex, final 
Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithNeighbors, 
+               public void coGroup(Iterable<Vertex<K, VV>> vertex,
+                               final Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, 
VV>>> keysWithNeighbors, 
                                Collector<T> out) throws Exception {
-               
+
                        final Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>> 
neighborsIterator = new Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>>() {
-               
-                               final Iterator<Tuple3<K, Edge<K, EV>, Vertex<K, 
VV>>> keysWithEdgesIterator = 
-                                               keysWithNeighbors.iterator();
-               
+
+                               final Iterator<Tuple3<K, Edge<K, EV>, Vertex<K, 
VV>>> keysWithEdgesIterator = keysWithNeighbors.iterator();
+
                                @Override
                                public boolean hasNext() {
                                        return keysWithEdgesIterator.hasNext();
                                }
-               
+
                                @Override
                                public Tuple2<Edge<K, EV>, Vertex<K, VV>> 
next() {
-                                       Tuple3<K, Edge<K, EV>, Vertex<K, VV>> 
next = keysWithEdgesIterator.next(); 
+                                       Tuple3<K, Edge<K, EV>, Vertex<K, VV>> 
next = keysWithEdgesIterator.next();
                                        return new Tuple2<Edge<K, EV>, 
Vertex<K, VV>>(next.f1, next.f2);
                                }
 
                                @Override
                                public void remove() {
                                        keysWithEdgesIterator.remove();
-                               }                       
+                               }
                        };
-                       
+
                        Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> 
neighborsIterable = new Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>>() {
                                public Iterator<Tuple2<Edge<K, EV>, Vertex<K, 
VV>>> iterator() {
                                        return neighborsIterator;
                                }
                        };
-               
-                       
out.collect(function.iterateNeighbors(vertex.iterator().next(), 
neighborsIterable));
-                       }
+
+                       
out.collect(function.iterateNeighbors(vertex.iterator().next(),
+                                       neighborsIterable));
+               }
 
                @Override
                public TypeInformation<T> getProducedType() {
-                       return 
TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, 
-                                                       function.getClass(), 3, 
null, null);
+                       return 
TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class,     
function.getClass(), 3, null, null);
                }
        }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
index 2f5de95..f5e7018 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
@@ -1,5 +1,22 @@
-package flink.graphs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
+package org.apache.flink.graph;
 
 import java.io.Serializable;
 
@@ -8,9 +25,7 @@ import java.io.Serializable;
  * @param <VV> vertex value type
  * @param <EV> edge value type
  */
-public interface GraphAlgorithm<K extends Comparable<K> & Serializable, VV 
extends Serializable,
-        EV extends Serializable> {
-
-    public Graph<K,VV,EV> run (Graph<K,VV,EV> input);
+public interface GraphAlgorithm<K extends Comparable<K> & Serializable, VV 
extends Serializable, EV extends Serializable> {
 
+       public Graph<K, VV, EV> run(Graph<K, VV, EV> input);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
index 124aea0..e0dfe11 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
@@ -1,4 +1,22 @@
-package flink.graphs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
 
 import java.io.Serializable;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
index d7b438c..c774024 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
@@ -1,4 +1,22 @@
-package flink.graphs;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
 
 import java.io.Serializable;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
index e589096..0b19e0e 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package flink.graphs;
+package org.apache.flink.graph;
 
 import java.io.Serializable;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
index 48a5fe4..33f8f1a 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
@@ -1,4 +1,22 @@
-package flink.graphs.example;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
 
 import java.util.Collection;
 
@@ -10,11 +28,10 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.example.utils.ExampleUtils;
 import org.apache.flink.types.NullValue;
 
-import flink.graphs.Graph;
-import flink.graphs.example.utils.ExampleUtils;
-
 /**
  * 
  * A simple example to illustrate the basic functionality of the graph-api.
@@ -42,7 +59,7 @@ public class GraphMetrics implements ProgramDescription {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                /** create a random graph **/
-               Graph<Long, NullValue, NullValue> graph = 
Graph.create(ExampleUtils
+               Graph<Long, NullValue, NullValue> graph = 
Graph.fromDataSet(ExampleUtils
                                .getRandomEdges(env, NUM_VERTICES), env);
                
                /** get the number of vertices **/
@@ -71,13 +88,13 @@ public class GraphMetrics implements ProgramDescription {
                DataSet<Long> minOutDegreeVertex = 
graph.outDegrees().minBy(1).map(new ProjectVertexId());
                
                /** print the results **/
-               ExampleUtils.printResult(numVertices, "Total number of 
vertices", env);
-               ExampleUtils.printResult(numEdges, "Total number of edges", 
env);
-               ExampleUtils.printResult(avgNodeDegree, "Average node degree", 
env);
-               ExampleUtils.printResult(maxInDegreeVertex, "Vertex with Max 
in-degree", env);
-               ExampleUtils.printResult(minInDegreeVertex, "Vertex with Min 
in-degree", env);
-               ExampleUtils.printResult(maxOutDegreeVertex, "Vertex with Max 
out-degree", env);
-               ExampleUtils.printResult(minOutDegreeVertex, "Vertex with Min 
out-degree", env);
+               ExampleUtils.printResult(numVertices, "Total number of 
vertices");
+               ExampleUtils.printResult(numEdges, "Total number of edges");
+               ExampleUtils.printResult(avgNodeDegree, "Average node degree");
+               ExampleUtils.printResult(maxInDegreeVertex, "Vertex with Max 
in-degree");
+               ExampleUtils.printResult(minInDegreeVertex, "Vertex with Min 
in-degree");
+               ExampleUtils.printResult(maxOutDegreeVertex, "Vertex with Max 
out-degree");
+               ExampleUtils.printResult(minOutDegreeVertex, "Vertex with Min 
out-degree");
 
                env.execute();
        }

Reply via email to