http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
index 50c9ae5..c490bb3 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
@@ -1,71 +1,94 @@
-package flink.graphs.example;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import flink.graphs.*;
-import flink.graphs.library.LabelPropagation;
+package org.apache.flink.graph.example;
 
 import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.*;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.library.LabelPropagation;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
 /**
- * This example uses the label propagation algorithm to detect communities by 
propagating labels.
- * Initially, each vertex is assigned its id as its label.
- * The vertices iteratively propagate their labels to their neighbors and 
adopt the most frequent label
- * among their neighbors.
- * The algorithm converges when no vertex changes value or the maximum number 
of iterations have been reached.
+ * This example uses the label propagation algorithm to detect communities by
+ * propagating labels. Initially, each vertex is assigned its id as its label.
+ * The vertices iteratively propagate their labels to their neighbors and adopt
+ * the most frequent label among their neighbors. The algorithm converges when
+ * no vertex changes value or the maximum number of iterations have been
+ * reached.
  */
 public class LabelPropagationExample implements ProgramDescription {
 
-    public static void main (String [] args) throws Exception {
+       public static void main(String[] args) throws Exception {
 
-        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-        DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
-        DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
+               DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
+               DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
 
-        Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, 
edges, env);
+               Graph<Long, Long, NullValue> graph = 
Graph.fromDataSet(vertices, edges, env);
 
-        DataSet<Vertex<Long, Long>> verticesWithCommunity =
-                graph.run(new 
LabelPropagation<Long>(maxIterations)).getVertices();
+               DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run(
+                               new 
LabelPropagation<Long>(maxIterations)).getVertices();
 
-        verticesWithCommunity.print();
+               verticesWithCommunity.print();
 
-        env.execute();
-    }
+               env.execute();
+       }
 
-    @Override
-    public String getDescription() {
-        return "Label Propagation Example";
-    }
+       @Override
+       public String getDescription() {
+               return "Label Propagation Example";
+       }
 
-    private static long numVertices = 100;
-    private static int maxIterations = 20;
+       private static long numVertices = 100;
+       private static int maxIterations = 20;
 
        @SuppressWarnings("serial")
        private static DataSet<Vertex<Long, Long>> 
getVertexDataSet(ExecutionEnvironment env) {
-            return env.generateSequence(1, numVertices)
-                    .map(new MapFunction<Long, Vertex<Long, Long>>() {
-                        public Vertex<Long, Long> map(Long l) throws Exception 
{
-                            return new Vertex<Long, Long>(l, l);
-                        }
-                    });
-    }
+               return env.generateSequence(1, numVertices).map(
+                               new MapFunction<Long, Vertex<Long, Long>>() {
+                                       public Vertex<Long, Long> map(Long l) 
throws Exception {
+                                               return new Vertex<Long, 
Long>(l, l);
+                                       }
+                               });
+       }
 
-    @SuppressWarnings("serial")
+       @SuppressWarnings("serial")
        private static DataSet<Edge<Long, NullValue>> 
getEdgeDataSet(ExecutionEnvironment env) {
-            return env.generateSequence(1, numVertices)
-                    .flatMap(new FlatMapFunction<Long, Edge<Long, 
NullValue>>() {
-                        @Override
-                        public void flatMap(Long key, Collector<Edge<Long, 
NullValue>> out) {
-                            int numOutEdges = (int) (Math.random() * 
(numVertices / 2));
-                            for (int i = 0; i < numOutEdges; i++) {
-                                long target = (long) (Math.random() * 
numVertices) + 1;
-                                out.collect(new Edge<Long, NullValue>(key, 
target, NullValue.getInstance()));
-                            }
-                        }
-                    });
-    }
+               return env.generateSequence(1, numVertices).flatMap(
+                               new FlatMapFunction<Long, Edge<Long, 
NullValue>>() {
+                                       @Override
+                                       public void flatMap(Long key,
+                                                       Collector<Edge<Long, 
NullValue>> out) {
+                                               int numOutEdges = (int) 
(Math.random() * (numVertices / 2));
+                                               for (int i = 0; i < 
numOutEdges; i++) {
+                                                       long target = (long) 
(Math.random() * numVertices) + 1;
+                                                       out.collect(new 
Edge<Long, NullValue>(key, target,
+                                                                       
NullValue.getInstance()));
+                                               }
+                                       }
+                               });
+       }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index 668c765..948ac5b 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -1,4 +1,22 @@
-package flink.graphs.example;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -13,176 +31,186 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.EdgesFunctionWithVertexValue;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.MusicProfilesData;
+import org.apache.flink.graph.library.LabelPropagation;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
-import flink.graphs.Edge;
-import flink.graphs.EdgeDirection;
-import flink.graphs.EdgesFunctionWithVertexValue;
-import flink.graphs.Graph;
-import flink.graphs.Vertex;
-import flink.graphs.example.utils.MusicProfilesData;
-import flink.graphs.library.LabelPropagation;
-
 @SuppressWarnings("serial")
 public class MusicProfiles implements ProgramDescription {
 
        /**
-        * This example demonstrates how to mix the "record" Flink API with the 
graph API.
-        * The input is a set <userId - songId - playCount> triplets and a set 
of
-        * bad records,i.e. song ids that should not be trusted.
-        * Initially, we use the record API to filter out the bad records.
-        * Then, we use the graph API to create a user -> song weighted 
bipartite graph
-        * and compute the top song (most listened) per user.
-        * Then, we use the record API again, to create a user-user similarity 
graph, 
-        * based on common songs, where two users that listen to the same song 
are connected.
-        * Finally, we use the graph API to run the label propagation community 
detection algorithm
-        * on the similarity graph.
+        * This example demonstrates how to mix the "record" Flink API with the
+        * graph API. The input is a set <userId - songId - playCount> triplets 
and
+        * a set of bad records,i.e. song ids that should not be trusted. 
Initially,
+        * we use the record API to filter out the bad records. Then, we use the
+        * graph API to create a user -> song weighted bipartite graph and 
compute
+        * the top song (most listened) per user. Then, we use the record API 
again,
+        * to create a user-user similarity graph, based on common songs, where 
two
+        * users that listen to the same song are connected. Finally, we use the
+        * graph API to run the label propagation community detection algorithm 
on
+        * the similarity graph.
         */
-       public static void main (String [] args) throws Exception {
-       
-       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-       final int numIterations = 10;
-
-       /** 
-        *  Read the user-song-play triplets
-        *  The format is <userID>\t<songID>\t<playcount>
-        */
-       DataSet<Tuple3<String, String, Integer>> triplets = 
MusicProfilesData.getUserSongTriplets(env);
-
-       /**
-        *  Read the mismatches dataset and extract the songIDs
-        *  The format is "ERROR: <songID trackID> song_title"
-        */
-       DataSet<Tuple1<String>> mismatches = 
MusicProfilesData.getMismatches(env).map(new ExtractMismatchSongIds());
-
-       /**
-        *  Filter out the mismatches from the triplets dataset
-        */
-       DataSet<Tuple3<String, String, Integer>> validTriplets = 
triplets.coGroup(mismatches)
-                       .where(1).equalTo(0).with(new FilterOutMismatches());
-
-       /**
-        *  Create a user -> song weighted bipartite graph
-        *  where the edge weights correspond to play counts
-        */
-       Graph<String, NullValue, Integer> userSongGraph = 
Graph.fromTupleDataSet(validTriplets, env);
-
-       /**
-        *  Get the top track (most listened) for each user
-        */
-       DataSet<Tuple2<String, String>> usersWithTopTrack = 
userSongGraph.reduceOnEdges(new GetTopSongPerUser(), 
-                       EdgeDirection.OUT).filter(new FilterSongNodes());
-
-       usersWithTopTrack.print();
-
-       /**
-        * Create a user-user similarity graph, based on common songs, 
-        * i.e. two users that listen to the same song are connected.
-        * For each song, we create an edge between each pair of its 
in-neighbors.
-        */
-       DataSet<Edge<String, NullValue>> similarUsers = 
userSongGraph.getEdges().groupBy(1)
-                       .reduceGroup(new CreateSimilarUserEdges()).distinct();
-
-       Graph<String, Long, NullValue> similarUsersGraph = 
Graph.fromDataSet(similarUsers,
-
-                       new MapFunction<String, Long>() {
-                                       public Long map(String value) { return 
1l; }
-
-       }, env).getUndirected();
-
-       /**
-        * Detect user communities using the label propagation library method
-        */
-
-       // Initialize each vertex with a unique numeric label
-       DataSet<Tuple2<String, Long>> idsWithInitialLabels = 
similarUsersGraph.getVertices()
-                       .reduceGroup(new AssignInitialLabelReducer());
-
-       // update the vertex values and run the label propagation algorithm
-       DataSet<Vertex<String, Long>> verticesWithCommunity = 
similarUsersGraph.joinWithVertices(idsWithInitialLabels,
-                                       new MapFunction<Tuple2<Long, Long>, 
Long>() {
-                                                       public Long 
map(Tuple2<Long, Long> value) {     return value.f1; }
-                                               })
-                       .run(new 
LabelPropagation<String>(numIterations)).getVertices();
-
-       verticesWithCommunity.print();
-
-       env.execute();
-    }
+       public static void main(String[] args) throws Exception {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               final int numIterations = 10;
+
+               /**
+                * Read the user-song-play triplets The format is
+                * <userID>\t<songID>\t<playcount>
+                */
+               DataSet<Tuple3<String, String, Integer>> triplets = 
MusicProfilesData.getUserSongTriplets(env);
+
+               /**
+                * Read the mismatches dataset and extract the songIDs The 
format is
+                * "ERROR: <songID trackID> song_title"
+                */
+               DataSet<Tuple1<String>> mismatches = 
MusicProfilesData.getMismatches(env).map(new ExtractMismatchSongIds());
+
+               /**
+                * Filter out the mismatches from the triplets dataset
+                */
+               DataSet<Tuple3<String, String, Integer>> validTriplets = 
triplets
+                               .coGroup(mismatches).where(1).equalTo(0)
+                               .with(new FilterOutMismatches());
+
+               /**
+                * Create a user -> song weighted bipartite graph where the 
edge weights
+                * correspond to play counts
+                */
+               Graph<String, NullValue, Integer> userSongGraph = 
Graph.fromTupleDataSet(validTriplets, env);
+
+               /**
+                * Get the top track (most listened) for each user
+                */
+               DataSet<Tuple2<String, String>> usersWithTopTrack = 
userSongGraph
+                               .reduceOnEdges(new GetTopSongPerUser(), 
EdgeDirection.OUT)
+                               .filter(new FilterSongNodes());
+
+               usersWithTopTrack.print();
+
+               /**
+                * Create a user-user similarity graph, based on common songs, 
i.e. two
+                * users that listen to the same song are connected. For each 
song, we
+                * create an edge between each pair of its in-neighbors.
+                */
+               DataSet<Edge<String, NullValue>> similarUsers = userSongGraph
+                               .getEdges().groupBy(1)
+                               .reduceGroup(new 
CreateSimilarUserEdges()).distinct();
+
+               Graph<String, Long, NullValue> similarUsersGraph = 
Graph.fromDataSet(similarUsers,
+                               new MapFunction<String, Long>() {
+                                       public Long map(String value) {
+                                               return 1l;
+                                       }
+                               }, env).getUndirected();
+
+               /**
+                * Detect user communities using the label propagation library 
method
+                */
+
+               // Initialize each vertex with a unique numeric label
+               DataSet<Tuple2<String, Long>> idsWithInitialLabels = 
similarUsersGraph
+                               .getVertices().reduceGroup(new 
AssignInitialLabelReducer());
+
+               // update the vertex values and run the label propagation 
algorithm
+               DataSet<Vertex<String, Long>> verticesWithCommunity = 
similarUsersGraph
+                               .joinWithVertices(idsWithInitialLabels,
+                                               new MapFunction<Tuple2<Long, 
Long>, Long>() {
+                                                       public Long 
map(Tuple2<Long, Long> value) {
+                                                               return value.f1;
+                                                       }
+                                               }).run(new 
LabelPropagation<String>(numIterations))
+                               .getVertices();
+
+               verticesWithCommunity.print();
+
+               env.execute();
+       }
 
        public static final class ExtractMismatchSongIds implements 
MapFunction<String, Tuple1<String>> {
+
                public Tuple1<String> map(String value) {
-                       String[] tokens = value.split("\\s+"); 
+                       String[] tokens = value.split("\\s+");
                        String songId = tokens[1].substring(1);
                        return new Tuple1<String>(songId);
                }
-    }
-
-       public static final class FilterOutMismatches implements 
CoGroupFunction<Tuple3<String, String, Integer>, 
-       Tuple1<String>, Tuple3<String, String, Integer>> {
-               public void coGroup(
-                               Iterable<Tuple3<String, String, Integer>> 
triplets,
-                               Iterable<Tuple1<String>> invalidSongs,
-                               Collector<Tuple3<String, String, Integer>> out) 
{
+       }
+
+       public static final class FilterOutMismatches implements 
CoGroupFunction<Tuple3<String, String, Integer>,
+               Tuple1<String>, Tuple3<String, String, Integer>> {
+
+               public void coGroup(Iterable<Tuple3<String, String, Integer>> 
triplets,
+                               Iterable<Tuple1<String>> invalidSongs, 
Collector<Tuple3<String, String, Integer>> out) {
+
                        if (!invalidSongs.iterator().hasNext()) {
                                // this is a valid triplet
                                for (Tuple3<String, String, Integer> triplet : 
triplets) {
-                                       out.collect(triplet);                   
                
+                                       out.collect(triplet);
                                }
                        }
                }
-    }
+       }
 
-       public static final class FilterSongNodes implements 
FilterFunction<Tuple2<String, String>> {
+       public static final class FilterSongNodes implements 
FilterFunction<Tuple2<String, String>> {
                public boolean filter(Tuple2<String, String> value) throws 
Exception {
                        return !value.f1.equals("");
                }
-    }
+       }
 
-       public static final class GetTopSongPerUser implements 
EdgesFunctionWithVertexValue
-               <String, NullValue, Integer, Tuple2<String, String>> {
-               public Tuple2<String, String> iterateEdges(Vertex<String, 
NullValue> vertex,    
+       public static final class GetTopSongPerUser     implements 
EdgesFunctionWithVertexValue<String, NullValue, Integer,
+               Tuple2<String, String>> {
+
+               public Tuple2<String, String> iterateEdges(Vertex<String, 
NullValue> vertex, 
                                Iterable<Edge<String, Integer>> edges) {
+
                        int maxPlaycount = 0;
                        String topSong = "";
-                       for (Edge<String, Integer> edge: edges) {
+                       for (Edge<String, Integer> edge : edges) {
                                if (edge.getValue() > maxPlaycount) {
                                        maxPlaycount = edge.getValue();
                                        topSong = edge.getTarget();
                                }
                        }
-                       return new Tuple2<String, String> (vertex.getId(), 
topSong);
+                       return new Tuple2<String, String>(vertex.getId(), 
topSong);
                }
-    }
+       }
+
+       public static final class CreateSimilarUserEdges implements 
GroupReduceFunction<Edge<String, Integer>,
+               Edge<String, NullValue>> {
 
-       public static final class CreateSimilarUserEdges implements 
GroupReduceFunction<Edge<String, Integer>,
-               Edge<String, NullValue>> {
                public void reduce(Iterable<Edge<String, Integer>> edges, 
Collector<Edge<String, NullValue>> out) {
                        List<String> listeners = new ArrayList<String>();
                        for (Edge<String, Integer> edge : edges) {
                                listeners.add(edge.getSource());
                        }
-                       for (int i=0; i < listeners.size()-1; i++) {
-                               out.collect(new Edge<String, 
NullValue>(listeners.get(i), listeners.get(i+1),
-                                               NullValue.getInstance()));
+                       for (int i = 0; i < listeners.size() - 1; i++) {
+                               out.collect(new Edge<String, 
NullValue>(listeners.get(i),
+                                               listeners.get(i + 1), 
NullValue.getInstance()));
                        }
                }
-    }
+       }
 
-       public static final class AssignInitialLabelReducer implements 
GroupReduceFunction<Vertex<String, Long>,
-               Tuple2<String, Long>> {
-               public void reduce(Iterable<Vertex<String, Long>> vertices, 
Collector<Tuple2<String, Long>> out) {
+       public static final class AssignInitialLabelReducer implements 
GroupReduceFunction<Vertex<String, Long>,
+               Tuple2<String, Long>> {
+
+               public void reduce(Iterable<Vertex<String, Long>> vertices,     
Collector<Tuple2<String, Long>> out) {
                        long label = 0;
                        for (Vertex<String, Long> vertex : vertices) {
                                out.collect(new Tuple2<String, 
Long>(vertex.getId(), label));
                                label++;
                        }
                }
-       }
+       }
 
        @Override
        public String getDescription() {
                return "Music Profiles Example";
        }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
index e3f815a..400508c 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
@@ -1,80 +1,102 @@
-package flink.graphs.example;
-
-
-import flink.graphs.*;
-import flink.graphs.library.PageRank;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
 
 import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.*;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.library.PageRank;
 import org.apache.flink.util.Collector;
 
 public class PageRankExample implements ProgramDescription {
 
-    @SuppressWarnings("serial")
-       public static void main (String [] args) throws Exception {
+       @SuppressWarnings("serial")
+       public static void main(String[] args) throws Exception {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               DataSet<Vertex<Long, Double>> pages = getPagesDataSet(env);
+
+               DataSet<Edge<Long, Double>> links = getLinksDataSet(env);
+
+               Graph<Long, Double, Double> network = Graph.fromDataSet(pages, 
links, env);
 
-        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple2<Long, Long>> vertexOutDegrees = 
network.outDegrees();
 
-        DataSet<Vertex<Long,Double>> pages = getPagesDataSet(env);
+               // assign the transition probabilities as the edge weights
+               Graph<Long, Double, Double> networkWithWeights = network
+                               .joinWithEdgesOnSource(vertexOutDegrees,
+                                               new MapFunction<Tuple2<Double, 
Long>, Double>() {
+                                                       public Double 
map(Tuple2<Double, Long> value) {
+                                                               return value.f0 
/ value.f1;
+                                                       }
+                                               });
 
-        DataSet<Edge<Long,Double>> links = getLinksDataSet(env);
+               DataSet<Vertex<Long, Double>> pageRanks = 
networkWithWeights.run(
+                               new PageRank<Long>(numPages, DAMPENING_FACTOR, 
maxIterations))
+                               .getVertices();
 
-        Graph<Long, Double, Double> network = new Graph<Long, Double, 
Double>(pages, links, env);
-        
-        DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
-        
-        // assign the transition probabilities as the edge weights
-        Graph<Long, Double, Double> networkWithWeights = 
network.joinWithEdgesOnSource(vertexOutDegrees, 
-                       new MapFunction<Tuple2<Double, Long>, Double>() {
-                                       public Double map(Tuple2<Double, Long> 
value) {
-                                               return value.f0 / value.f1;
+               pageRanks.print();
+
+               env.execute();
+       }
+
+       @Override
+       public String getDescription() {
+               return "PageRank";
+       }
+
+       private static final double DAMPENING_FACTOR = 0.85;
+       private static long numPages = 10;
+       private static int maxIterations = 10;
+
+       @SuppressWarnings("serial")
+       private static DataSet<Vertex<Long, Double>> 
getPagesDataSet(ExecutionEnvironment env) {
+               return env.generateSequence(1, numPages).map(
+                               new MapFunction<Long, Vertex<Long, Double>>() {
+                                       @Override
+                                       public Vertex<Long, Double> map(Long l) 
throws Exception {
+                                               return new Vertex<Long, 
Double>(l, 1.0 / numPages);
                                        }
                                });
 
-        DataSet<Vertex<Long,Double>> pageRanks =
-                       networkWithWeights.run(new PageRank<Long>(numPages, 
DAMPENING_FACTOR, maxIterations)).getVertices();
-
-        pageRanks.print();
-
-        env.execute();
-    }
-
-    @Override
-    public String getDescription() {
-        return "PageRank";
-    }
-
-    private static final double DAMPENING_FACTOR = 0.85;
-    private static long numPages = 10;
-    private static int maxIterations = 10;
-
-    @SuppressWarnings("serial")
-       private static DataSet<Vertex<Long,Double>> 
getPagesDataSet(ExecutionEnvironment env) {
-            return env.generateSequence(1, numPages)
-                    .map(new MapFunction<Long, Vertex<Long, Double>>() {
-                        @Override
-                        public Vertex<Long, Double> map(Long l) throws 
Exception {
-                            return new Vertex<Long, Double>(l, 1.0 / numPages);
-                        }
-                    });
-
-    }
-
-    @SuppressWarnings("serial")
-    private static DataSet<Edge<Long, Double>> 
getLinksDataSet(ExecutionEnvironment env) {
-            return env.generateSequence(1, numPages)
-                    .flatMap(new FlatMapFunction<Long, Edge<Long, Double>>() {
-                        @Override
-                        public void flatMap(Long key, Collector<Edge<Long, 
Double>> out) throws Exception {
-                            int numOutEdges = (int) (Math.random() * (numPages 
/ 2));
-                            for (int i = 0; i < numOutEdges; i++) {
-                                long target = (long) (Math.random() * 
numPages) + 1;
-                                out.collect(new Edge<Long, Double>(key, 
target, 1.0));
-                            }
-                        }
-                    });
-    }
+       }
+
+       @SuppressWarnings("serial")
+       private static DataSet<Edge<Long, Double>> 
getLinksDataSet(ExecutionEnvironment env) {
+               return env.generateSequence(1, numPages).flatMap(
+                               new FlatMapFunction<Long, Edge<Long, Double>>() 
{
+                                       @Override
+                                       public void flatMap(Long key,
+                                                       Collector<Edge<Long, 
Double>> out) throws Exception {
+                                               int numOutEdges = (int) 
(Math.random() * (numPages / 2));
+                                               for (int i = 0; i < 
numOutEdges; i++) {
+                                                       long target = (long) 
(Math.random() * numPages) + 1;
+                                                       out.collect(new 
Edge<Long, Double>(key, target, 1.0));
+                                               }
+                                       }
+                               });
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
index 75e33dc..7f31525 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
@@ -1,41 +1,59 @@
-package flink.graphs.example;
-
-import flink.graphs.Edge;
-import flink.graphs.Graph;
-import flink.graphs.Vertex;
-import flink.graphs.example.utils.ExampleUtils;
-import flink.graphs.library.SingleSourceShortestPaths;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
 
 import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.ExampleUtils;
+import org.apache.flink.graph.library.SingleSourceShortestPaths;
 
 public class SingleSourceShortestPathsExample implements ProgramDescription {
 
-    private static int maxIterations = 5;
+       private static int maxIterations = 5;
 
-    public static void main (String [] args) throws Exception {
+       public static void main(String[] args) throws Exception {
 
-        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-        DataSet<Vertex<Long, Double>> vertices = 
ExampleUtils.getLongDoubleVertexData(env);
+               DataSet<Vertex<Long, Double>> vertices = 
ExampleUtils.getLongDoubleVertexData(env);
 
-        DataSet<Edge<Long, Double>> edges = 
ExampleUtils.getLongDoubleEdgeData(env);
+               DataSet<Edge<Long, Double>> edges = 
ExampleUtils.getLongDoubleEdgeData(env);
 
-        Long srcVertexId = 1L;
+               Long srcVertexId = 1L;
 
-        Graph<Long, Double, Double> graph = Graph.create(vertices, edges, env);
+               Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, 
edges, env);
 
-        DataSet<Vertex<Long,Double>> singleSourceShortestPaths =
-                graph.run(new SingleSourceShortestPaths<Long>(srcVertexId, 
maxIterations)).getVertices();
+               DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph
+                               .run(new 
SingleSourceShortestPaths<Long>(srcVertexId,
+                                               maxIterations)).getVertices();
 
-        singleSourceShortestPaths.print();
+               singleSourceShortestPaths.print();
 
-        env.execute();
-    }
+               env.execute();
+       }
 
-    @Override
-    public String getDescription() {
-        return "Single Source Shortest Paths";
-    }
+       @Override
+       public String getDescription() {
+               return "Single Source Shortest Paths";
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java
index 8c131e4..3f1c7bb 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java
@@ -1,10 +1,27 @@
-package flink.graphs.example.utils;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
 
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.graph.Edge;
 import org.apache.flink.types.NullValue;
 
-import flink.graphs.Edge;
-
 public class EdgeWithLongIdNullValueParser extends RichMapFunction<String, 
Edge<Long, NullValue>> {
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
index 4588230..d986478 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
@@ -1,4 +1,22 @@
-package flink.graphs.example.utils;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
 
 import java.io.PrintStream;
 import java.util.ArrayList;
@@ -10,46 +28,47 @@ import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
-import flink.graphs.Edge;
-import flink.graphs.Vertex;
-
 public class ExampleUtils {
 
        @SuppressWarnings({ "serial", "unchecked", "rawtypes" })
-       public static void printResult(DataSet set, String msg, 
ExecutionEnvironment env) {
+       public static void printResult(DataSet set, String msg) {
                set.output(new PrintingOutputFormatWithMessage(msg) {
                });
        }
-       
-       public static class PrintingOutputFormatWithMessage<T> implements 
OutputFormat<T> {
+
+       public static class PrintingOutputFormatWithMessage<T> implements
+                       OutputFormat<T> {
 
                private static final long serialVersionUID = 1L;
-               
+
                private transient PrintStream stream;
-               
+
                private transient String prefix;
-               
+
                private String message;
-               
+
                // 
--------------------------------------------------------------------------------------------
-               
+
                /**
                 * Instantiates a printing output format that prints to 
standard out.
                 */
-               public PrintingOutputFormatWithMessage() {}
-               
+               public PrintingOutputFormatWithMessage() {
+               }
+
                public PrintingOutputFormatWithMessage(String msg) {
-                       this.message = msg;     
+                       this.message = msg;
                }
 
                @Override
                public void open(int taskNumber, int numTasks) {
                        // get the target stream
                        this.stream = System.out;
-                       
+
                        // set the prefix to message
                        this.prefix = message + ": ";
                }
@@ -58,8 +77,7 @@ public class ExampleUtils {
                public void writeRecord(T record) {
                        if (this.prefix != null) {
                                this.stream.println(this.prefix + 
record.toString());
-                       }
-                       else {
+                       } else {
                                this.stream.println(record.toString());
                        }
                }
@@ -69,41 +87,46 @@ public class ExampleUtils {
                        this.stream = null;
                        this.prefix = null;
                }
-               
+
                @Override
                public String toString() {
                        return "Print to System.out";
                }
 
                @Override
-               public void configure(Configuration parameters) {}
+               public void configure(Configuration parameters) {
+               }
        }
 
        @SuppressWarnings("serial")
-       public static DataSet<Vertex<Long, NullValue>> 
getVertexIds(ExecutionEnvironment env,
-                       final long numVertices) {
-        return env.generateSequence(1, numVertices)
-                .map(new MapFunction<Long, Vertex<Long, NullValue>>() {
-                    public Vertex<Long, NullValue> map(Long l) {
-                        return new Vertex<Long, NullValue>(l, 
NullValue.getInstance());
-                    }
-                });
+       public static DataSet<Vertex<Long, NullValue>> getVertexIds(
+                       ExecutionEnvironment env, final long numVertices) {
+               return env.generateSequence(1, numVertices).map(
+                               new MapFunction<Long, Vertex<Long, 
NullValue>>() {
+                                       public Vertex<Long, NullValue> map(Long 
l) {
+                                               return new Vertex<Long, 
NullValue>(l, NullValue
+                                                               .getInstance());
+                                       }
+                               });
        }
 
        @SuppressWarnings("serial")
-       public static DataSet<Edge<Long, NullValue>> 
getRandomEdges(ExecutionEnvironment env,
-                       final long numVertices) {
-               return env.generateSequence(1, numVertices)
-                       .flatMap(new FlatMapFunction<Long, Edge<Long, 
NullValue>>() {
-                           @Override
-                           public void flatMap(Long key, Collector<Edge<Long, 
NullValue>> out) throws Exception {
-                               int numOutEdges = (int) (Math.random() * 
(numVertices / 2));
-                               for (int i = 0; i < numOutEdges; i++) {
-                                   long target = (long) (Math.random() * 
numVertices) + 1;
-                                   out.collect(new Edge<Long, NullValue>(key, 
target, NullValue.getInstance()));
-                               }
-                           }
-                       });
+       public static DataSet<Edge<Long, NullValue>> getRandomEdges(
+                       ExecutionEnvironment env, final long numVertices) {
+               return env.generateSequence(1, numVertices).flatMap(
+                               new FlatMapFunction<Long, Edge<Long, 
NullValue>>() {
+                                       @Override
+                                       public void flatMap(Long key,
+                                                       Collector<Edge<Long, 
NullValue>> out)
+                                                       throws Exception {
+                                               int numOutEdges = (int) 
(Math.random() * (numVertices / 2));
+                                               for (int i = 0; i < 
numOutEdges; i++) {
+                                                       long target = (long) 
(Math.random() * numVertices) + 1;
+                                                       out.collect(new 
Edge<Long, NullValue>(key, target,
+                                                                       
NullValue.getInstance()));
+                                               }
+                                       }
+                               });
        }
 
        public static final DataSet<Vertex<Long, Double>> 
getLongDoubleVertexData(
@@ -117,7 +140,7 @@ public class ExampleUtils {
 
                return env.fromCollection(vertices);
        }
-       
+
        public static final DataSet<Edge<Long, Double>> getLongDoubleEdgeData(
                        ExecutionEnvironment env) {
                List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
@@ -128,8 +151,7 @@ public class ExampleUtils {
                edges.add(new Edge<Long, Double>(3L, 5L, 35.0));
                edges.add(new Edge<Long, Double>(4L, 5L, 45.0));
                edges.add(new Edge<Long, Double>(5L, 1L, 51.0));
-               
+
                return env.fromCollection(edges);
        }
 }
-

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
index cfe9c88..0a7162d 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
@@ -1,4 +1,22 @@
-package flink.graphs.example.utils;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
 
 import java.util.ArrayList;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index fc77d0b..69d7713 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -1,10 +1,28 @@
-package flink.graphs.library;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import flink.graphs.*;
-import flink.graphs.spargel.MessageIterator;
-import flink.graphs.spargel.MessagingFunction;
-import flink.graphs.spargel.VertexUpdateFunction;
+package org.apache.flink.graph.library;
 
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.types.NullValue;
 
 import java.io.Serializable;
@@ -13,43 +31,43 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 /**
- * An implementation of the label propagation algorithm.
- * The iterative algorithm detects communities by propagating labels.
- * In each iteration, a vertex adopts the label that is most frequent among 
its neighbors' labels.
- * Labels are represented by Longs and we assume a total ordering among them, 
in order to break ties.
- * The algorithm converges when no vertex changes its value or the maximum 
number of iterations have been reached.
- * Note that different initializations might lead to different results.
- *
+ * An implementation of the label propagation algorithm. The iterative 
algorithm
+ * detects communities by propagating labels. In each iteration, a vertex 
adopts
+ * the label that is most frequent among its neighbors' labels. Labels are
+ * represented by Longs and we assume a total ordering among them, in order to
+ * break ties. The algorithm converges when no vertex changes its value or the
+ * maximum number of iterations have been reached. Note that different
+ * initializations might lead to different results.
+ * 
  */
 @SuppressWarnings("serial")
-public class LabelPropagation<K extends Comparable<K> & Serializable> 
implements GraphAlgorithm<K, Long, NullValue> {
+public class LabelPropagation<K extends Comparable<K> & Serializable>
+               implements GraphAlgorithm<K, Long, NullValue> {
 
-    private final int maxIterations;
+       private final int maxIterations;
 
-    public LabelPropagation(int maxIterations) {
-        this.maxIterations = maxIterations;
-    }
+       public LabelPropagation(int maxIterations) {
+               this.maxIterations = maxIterations;
+       }
 
-    @Override
-    public Graph<K, Long, NullValue> run(Graph<K, Long, NullValue> input) {
+       @Override
+       public Graph<K, Long, NullValue> run(Graph<K, Long, NullValue> input) {
 
-       // iteratively adopt the most frequent label among the neighbors
-       // of each vertex
-       return input.runVertexCentricIteration(
-                new UpdateVertexLabel<K>(),
-                new SendNewLabelToNeighbors<K>(),
-                maxIterations
-        );
-    }
+               // iteratively adopt the most frequent label among the neighbors
+               // of each vertex
+               return input.runVertexCentricIteration(new 
UpdateVertexLabel<K>(),
+                               new SendNewLabelToNeighbors<K>(), 
maxIterations);
+       }
 
-    /**
-     * Function that updates the value of a vertex by adopting the most 
frequent label
-     * among its in-neighbors
-     */
-    public static final class UpdateVertexLabel<K extends Comparable<K> & 
Serializable>
-            extends VertexUpdateFunction<K, Long, Long> {
+       /**
+        * Function that updates the value of a vertex by adopting the most 
frequent
+        * label among its in-neighbors
+        */
+       public static final class UpdateVertexLabel<K extends Comparable<K> & 
Serializable>
+                       extends VertexUpdateFunction<K, Long, Long> {
 
-               public void updateVertex(K vertexKey, Long vertexValue, 
MessageIterator<Long> inMessages) {
+               public void updateVertex(K vertexKey, Long vertexValue,
+                               MessageIterator<Long> inMessages) {
                        Map<Long, Long> labelsWithFrequencies = new 
HashMap<Long, Long>();
 
                        long maxFrequency = 1;
@@ -60,12 +78,12 @@ public class LabelPropagation<K extends Comparable<K> & 
Serializable> implements
                                if (labelsWithFrequencies.containsKey(msg)) {
                                        long currentFreq = 
labelsWithFrequencies.get(msg);
                                        labelsWithFrequencies.put(msg, 
currentFreq + 1);
-                               }
-                               else {
+                               } else {
                                        labelsWithFrequencies.put(msg, 1L);
                                }
                        }
-                       // select the most frequent label: if two or more 
labels have the same frequency,
+                       // select the most frequent label: if two or more 
labels have the
+                       // same frequency,
                        // the node adopts the label with the highest value
                        for (Entry<Long, Long> entry : 
labelsWithFrequencies.entrySet()) {
                                if (entry.getValue() == maxFrequency) {
@@ -73,8 +91,7 @@ public class LabelPropagation<K extends Comparable<K> & 
Serializable> implements
                                        if (entry.getKey() > mostFrequentLabel) 
{
                                                mostFrequentLabel = 
entry.getKey();
                                        }
-                               }
-                               else if (entry.getValue() > maxFrequency) {
+                               } else if (entry.getValue() > maxFrequency) {
                                        maxFrequency = entry.getValue();
                                        mostFrequentLabel = entry.getKey();
                                }
@@ -83,16 +100,16 @@ public class LabelPropagation<K extends Comparable<K> & 
Serializable> implements
                        // set the new vertex value
                        setNewVertexValue(mostFrequentLabel);
                }
-    }
+       }
 
-    /**
-     * Sends the vertex label to all out-neighbors
-     */
-    public static final class SendNewLabelToNeighbors<K extends Comparable<K> 
& Serializable>
-            extends MessagingFunction<K, Long, Long, NullValue> {
+       /**
+        * Sends the vertex label to all out-neighbors
+        */
+       public static final class SendNewLabelToNeighbors<K extends 
Comparable<K> & Serializable>
+                       extends MessagingFunction<K, Long, Long, NullValue> {
 
-       public void sendMessages(K vertexKey, Long newLabel) {
-            sendMessageToAllNeighbors(newLabel);
-        }
-    }
+               public void sendMessages(K vertexKey, Long newLabel) {
+                       sendMessageToAllNeighbors(newLabel);
+               }
+       }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index d29a9dc..39b8ef1 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -1,76 +1,95 @@
-package flink.graphs.library;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import flink.graphs.Edge;
-import flink.graphs.Graph;
-import flink.graphs.GraphAlgorithm;
-import flink.graphs.spargel.MessageIterator;
-import flink.graphs.spargel.MessagingFunction;
-import flink.graphs.spargel.VertexUpdateFunction;
+package org.apache.flink.graph.library;
 
 import java.io.Serializable;
 
-public class PageRank<K extends Comparable<K> & Serializable> implements 
GraphAlgorithm<K, Double, Double> {
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
-    private long numVertices;
-    private double beta;
-    private int maxIterations;
+public class PageRank<K extends Comparable<K> & Serializable> implements
+               GraphAlgorithm<K, Double, Double> {
 
-    public PageRank(long numVertices, double beta, int maxIterations) {
-        this.numVertices = numVertices;
-        this.beta = beta;
-        this.maxIterations = maxIterations;
-    }
+       private long numVertices;
+       private double beta;
+       private int maxIterations;
 
-    @Override
-    public Graph<K, Double, Double> run(Graph<K, Double, Double> network) {
-        return network.runVertexCentricIteration(
-                new VertexRankUpdater<K>(numVertices, beta),
-                new RankMessenger<K>(),
-                maxIterations
-        );
-    }
+       public PageRank(long numVertices, double beta, int maxIterations) {
+               this.numVertices = numVertices;
+               this.beta = beta;
+               this.maxIterations = maxIterations;
+       }
 
+       @Override
+       public Graph<K, Double, Double> run(Graph<K, Double, Double> network) {
+               return network.runVertexCentricIteration(new 
VertexRankUpdater<K>(
+                               numVertices, beta), new RankMessenger<K>(), 
maxIterations);
+       }
 
-    /**
-     * Function that updates the rank of a vertex by summing up the partial 
ranks from all incoming messages
-     * and then applying the dampening formula.
-     */
-    @SuppressWarnings("serial")
-       public static final class VertexRankUpdater<K extends Comparable<K> & 
Serializable> extends VertexUpdateFunction<K, Double, Double> {
+       /**
+        * Function that updates the rank of a vertex by summing up the partial
+        * ranks from all incoming messages and then applying the dampening 
formula.
+        */
+       @SuppressWarnings("serial")
+       public static final class VertexRankUpdater<K extends Comparable<K> & 
Serializable>
+                       extends VertexUpdateFunction<K, Double, Double> {
 
-        private final long numVertices;
-        private final double beta;
+               private final long numVertices;
+               private final double beta;
 
-        public VertexRankUpdater(long numVertices, double beta) {
-            this.numVertices = numVertices;
-            this.beta = beta;
-        }
+               public VertexRankUpdater(long numVertices, double beta) {
+                       this.numVertices = numVertices;
+                       this.beta = beta;
+               }
 
-        @Override
-        public void updateVertex(K vertexKey, Double vertexValue, 
MessageIterator<Double> inMessages) {
-            double rankSum = 0.0;
-            for (double msg : inMessages) {
-                rankSum += msg;
-            }
+               @Override
+               public void updateVertex(K vertexKey, Double vertexValue,
+                               MessageIterator<Double> inMessages) {
+                       double rankSum = 0.0;
+                       for (double msg : inMessages) {
+                               rankSum += msg;
+                       }
 
-            // apply the dampening factor / random jump
-            double newRank = (beta * rankSum) + (1-beta)/numVertices;
-            setNewVertexValue(newRank);
-        }
-    }
+                       // apply the dampening factor / random jump
+                       double newRank = (beta * rankSum) + (1 - beta) / 
numVertices;
+                       setNewVertexValue(newRank);
+               }
+       }
 
-    /**
-     * Distributes the rank of a vertex among all target vertices according to 
the transition probability,
-     * which is associated with an edge as the edge value.
-     */
-    @SuppressWarnings("serial")
-       public static final class RankMessenger<K extends Comparable<K> & 
Serializable> extends MessagingFunction<K, Double, Double, Double> {
+       /**
+        * Distributes the rank of a vertex among all target vertices according 
to
+        * the transition probability, which is associated with an edge as the 
edge
+        * value.
+        */
+       @SuppressWarnings("serial")
+       public static final class RankMessenger<K extends Comparable<K> & 
Serializable>
+                       extends MessagingFunction<K, Double, Double, Double> {
 
-        @Override
-        public void sendMessages(K vertexId, Double newRank) {
-            for (Edge<K, Double> edge : getOutgoingEdges()) {
-                sendMessageTo(edge.getTarget(), newRank * edge.getValue());
-            }
-        }
-    }
+               @Override
+               public void sendMessages(K vertexId, Double newRank) {
+                       for (Edge<K, Double> edge : getOutgoingEdges()) {
+                               sendMessageTo(edge.getTarget(), newRank * 
edge.getValue());
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 0da8a90..2f575e7 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -1,94 +1,114 @@
-package flink.graphs.library;
-
-import flink.graphs.*;
-import flink.graphs.spargel.MessageIterator;
-import flink.graphs.spargel.MessagingFunction;
-import flink.graphs.spargel.VertexUpdateFunction;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
 import java.io.Serializable;
 
 @SuppressWarnings("serial")
-public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable> 
implements GraphAlgorithm<K, Double, Double> {
+public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable>
+               implements GraphAlgorithm<K, Double, Double> {
+
+       private final K srcVertexId;
+       private final Integer maxIterations;
 
-    private final K srcVertexId;
-    private final Integer maxIterations;
+       public SingleSourceShortestPaths(K srcVertexId, Integer maxIterations) {
+               this.srcVertexId = srcVertexId;
+               this.maxIterations = maxIterations;
+       }
 
-    public SingleSourceShortestPaths(K srcVertexId, Integer maxIterations) {
-        this.srcVertexId = srcVertexId;
-        this.maxIterations = maxIterations;
-    }
+       @Override
+       public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
 
-    @Override
-    public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
+               return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
+                               .runVertexCentricIteration(new 
VertexDistanceUpdater<K>(),
+                                               new MinDistanceMessenger<K>(), 
maxIterations);
+       }
 
-       return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
-                       .runVertexCentricIteration(
-                new VertexDistanceUpdater<K>(),
-                new MinDistanceMessenger<K>(),
-                maxIterations
-        );
-    }
+       public static final class InitVerticesMapper<K extends Comparable<K> & 
Serializable>
+                       implements MapFunction<Vertex<K, Double>, Double> {
 
-    public static final class InitVerticesMapper<K extends Comparable<K> & 
Serializable> 
-       implements MapFunction<Vertex<K,Double>, Double> {
+               private K srcVertexId;
 
-       private K srcVertexId;
+               public InitVerticesMapper(K srcId) {
+                       this.srcVertexId = srcId;
+               }
 
-       public InitVerticesMapper(K srcId) {
-               this.srcVertexId = srcId;
-       }
-               
                public Double map(Vertex<K, Double> value) {
                        if (value.f0.equals(srcVertexId)) {
                                return 0.0;
-                       }
-                       else {
+                       } else {
                                return Double.MAX_VALUE;
                        }
                }
-    }
-
-    /**
-     * Function that updates the value of a vertex by picking the minimum 
distance from all incoming messages.
-     *
-     * @param <K>
-     */
-    public static final class VertexDistanceUpdater<K extends Comparable<K> & 
Serializable>
-            extends VertexUpdateFunction<K, Double, Double> {
-
-        @Override
-        public void updateVertex(K vertexKey, Double vertexValue, 
MessageIterator<Double> inMessages) {
-
-            Double minDistance = Double.MAX_VALUE;
-
-            for (double msg : inMessages) {
-                if (msg < minDistance) {
-                    minDistance = msg;
-                }
-            }
-
-            if (vertexValue > minDistance) {
-                setNewVertexValue(minDistance);
-            }
-        }
-    }
-
-    /**
-     * Distributes the minimum distance associated with a given vertex among 
all the target vertices
-     * summed up with the edge's value.
-     *
-     * @param <K>
-     */
-    public static final class MinDistanceMessenger<K extends Comparable<K> & 
Serializable>
-            extends MessagingFunction<K, Double, Double, Double> {
-
-        @Override
-        public void sendMessages(K vertexKey, Double newDistance) throws 
Exception {
-            for (Edge<K, Double> edge : getOutgoingEdges()) {
-                sendMessageTo(edge.getTarget(), newDistance + edge.getValue());
-            }
-        }
-    }
+       }
+
+       /**
+        * Function that updates the value of a vertex by picking the minimum
+        * distance from all incoming messages.
+        * 
+        * @param <K>
+        */
+       public static final class VertexDistanceUpdater<K extends Comparable<K> 
& Serializable>
+                       extends VertexUpdateFunction<K, Double, Double> {
+
+               @Override
+               public void updateVertex(K vertexKey, Double vertexValue,
+                               MessageIterator<Double> inMessages) {
+
+                       Double minDistance = Double.MAX_VALUE;
+
+                       for (double msg : inMessages) {
+                               if (msg < minDistance) {
+                                       minDistance = msg;
+                               }
+                       }
+
+                       if (vertexValue > minDistance) {
+                               setNewVertexValue(minDistance);
+                       }
+               }
+       }
+
+       /**
+        * Distributes the minimum distance associated with a given vertex 
among all
+        * the target vertices summed up with the edge's value.
+        * 
+        * @param <K>
+        */
+       public static final class MinDistanceMessenger<K extends Comparable<K> 
& Serializable>
+                       extends MessagingFunction<K, Double, Double, Double> {
+
+               @Override
+               public void sendMessages(K vertexKey, Double newDistance)
+                               throws Exception {
+                       for (Edge<K, Double> edge : getOutgoingEdges()) {
+                               sendMessageTo(edge.getTarget(), newDistance + 
edge.getValue());
+                       }
+               }
+       }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/package-info.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/package-info.java
deleted file mode 100644
index 695b2b8..0000000
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/package-info.java
+++ /dev/null
@@ -1,4 +0,0 @@
-/**
- * 
- */
-package flink.graphs;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
index 08ba2c0..d6fdc8a 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package flink.graphs.spargel;
+package org.apache.flink.graph.spargel;
 
 import java.util.Iterator;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
index ab451bb..e8a297f 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package flink.graphs.spargel;
+package org.apache.flink.graph.spargel;
 
 import java.io.Serializable;
 import java.util.Collection;
@@ -26,11 +26,10 @@ import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
-import flink.graphs.Edge;
-
 /**
  * The base class for functions that produce messages between vertices as a 
part of a {@link VertexCentricIteration}.
  * 

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
index 5f89e90..9c72485 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package flink.graphs.spargel;
+package org.apache.flink.graph.spargel;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -38,11 +38,10 @@ import 
org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.util.Collector;
 
-import flink.graphs.Edge;
-import flink.graphs.Vertex;
-
 /**
  * This class represents iterative graph computations, programmed in a 
vertex-centric perspective.
  * It is a special case of <i>Bulk Synchronous Parallel<i> computation. The 
paradigm has also been
@@ -106,7 +105,7 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey> & Se
        private VertexCentricIteration(VertexUpdateFunction<VertexKey, 
VertexValue, Message> uf,
                        MessagingFunction<VertexKey, VertexValue, Message, 
EdgeValue> mf,
                        DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue, 
-                       int maximumNumberOfIterations, boolean 
edgeHasValueMarker)
+                       int maximumNumberOfIterations)
        {
                Validate.notNull(uf);
                Validate.notNull(mf);
@@ -319,7 +318,7 @@ public class VertexCentricIteration<VertexKey extends 
Comparable<VertexKey> & Se
                                        MessagingFunction<VertexKey, 
VertexValue, Message, EdgeValue> mf,
                                        int maximumNumberOfIterations)
        {
-               return new VertexCentricIteration<VertexKey, VertexValue, 
Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations, true);
+               return new VertexCentricIteration<VertexKey, VertexValue, 
Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations);
        }
        
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
index e30451c..1157a18 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -16,18 +16,17 @@
  * limitations under the License.
  */
 
-package flink.graphs.spargel;
+package org.apache.flink.graph.spargel;
 
 import java.io.Serializable;
 import java.util.Collection;
 
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
-import flink.graphs.Vertex;
-
 /**
  * This class must be extended by functions that compute the state of the 
vertex depending on the old state and the
  * incoming messages. The central method is {@link #updateVertex(Comparable, 
Object, MessageIterator)}, which is

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
index 86103a6..a7b7b62 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
@@ -1,11 +1,28 @@
-package flink.graphs.utils;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
 
 import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
-
-import flink.graphs.Edge;
+import org.apache.flink.graph.Edge;
 
 public class EdgeToTuple3Map<K extends Comparable<K> & Serializable, 
        EV extends Serializable> implements MapFunction<Edge<K, EV>, Tuple3<K, 
K, EV>> {

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
index 0b0dc18..aba1c14 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
@@ -1,4 +1,22 @@
-package flink.graphs.utils;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -14,27 +32,26 @@ public class GraphUtils {
 
        @SuppressWarnings({ "unchecked", "rawtypes" })
        public static DataSet<Integer> count(DataSet set, ExecutionEnvironment 
env) {
-               List<Integer> list = new ArrayList<>();
+               List<Integer> list = new ArrayList<Integer>();
                list.add(0);
                DataSet<Integer> initialCount = env.fromCollection(list);
-        return set
-                .map(new OneMapper())
-                .union(initialCount)
-                .reduce(new AddOnesReducer())
-                .first(1);
-    }
+               return set.map(new OneMapper()).union(initialCount)
+                               .reduce(new AddOnesReducer()).first(1);
+       }
+
+       private static final class OneMapper<T extends Tuple> implements
+                       MapFunction<T, Integer> {
+               @Override
+               public Integer map(T o) throws Exception {
+                       return 1;
+               }
+       }
 
-       private static final class OneMapper<T extends Tuple> implements 
MapFunction<T, Integer> {
-            @Override
-            public Integer map(T o) throws Exception {
-                return 1;
-            }
-    }
-    
-    private static final class AddOnesReducer implements 
ReduceFunction<Integer> {
-            @Override
-            public Integer reduce(Integer one, Integer two) throws Exception {
-                return one + two;
-            }
-    } 
+       private static final class AddOnesReducer implements
+                       ReduceFunction<Integer> {
+               @Override
+               public Integer reduce(Integer one, Integer two) throws 
Exception {
+                       return one + two;
+               }
+       }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
index 893ae95..d58e4ff 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
@@ -1,11 +1,28 @@
-package flink.graphs.utils;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
 
 import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
-
-import flink.graphs.Vertex;
+import org.apache.flink.graph.Vertex;
 
 public class Tuple2ToVertexMap<K extends Comparable<K> & Serializable, 
        VV extends Serializable> implements MapFunction<Tuple2<K, VV>, 
Vertex<K, VV>> {

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
index 2a6cb23..3668dd2 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
@@ -1,11 +1,28 @@
-package flink.graphs.utils;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
 
 import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
-
-import flink.graphs.Edge;
+import org.apache.flink.graph.Edge;
 
 /**
  * create an Edge DataSetfrom a Tuple3 dataset

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
index 30f867d..318e1ed 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
@@ -1,11 +1,28 @@
-package flink.graphs.utils;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
 
 import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
-
-import flink.graphs.Vertex;
+import org.apache.flink.graph.Vertex;
 
 public class VertexToTuple2Map<K extends Comparable<K> & Serializable, 
        VV extends Serializable> implements MapFunction<Vertex<K, VV>, 
Tuple2<K, VV>> {

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
index 88dfcde..47209b3 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
@@ -1,21 +1,39 @@
-package flink.graphs.validation;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.validation;
+
 import java.io.Serializable;
 
 import org.apache.flink.api.java.DataSet;
-
-import flink.graphs.Graph;
+import org.apache.flink.graph.Graph;
 
 /**
  * A validation method for different types of Graphs
- *
+ * 
  * @param <K>
  * @param <VV>
  * @param <EV>
  */
 @SuppressWarnings("serial")
-public abstract class GraphValidator<K extends Comparable<K> & Serializable, 
VV extends Serializable,
-        EV extends Serializable> implements Serializable{
+public abstract class GraphValidator<K extends Comparable<K> & Serializable, 
VV extends Serializable, EV extends Serializable>
+               implements Serializable {
 
-    public abstract DataSet<Boolean> validate(Graph<K, VV, EV> graph);
+       public abstract DataSet<Boolean> validate(Graph<K, VV, EV> graph);
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
index 6b7a619..b043f3c 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -1,77 +1,87 @@
-package flink.graphs.validation;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.validation;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.GraphUtils;
 import org.apache.flink.util.Collector;
 
-import flink.graphs.Edge;
-import flink.graphs.Graph;
-import flink.graphs.Vertex;
-import flink.graphs.utils.GraphUtils;
-
 import java.io.Serializable;
 
 @SuppressWarnings("serial")
-public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, 
VV extends Serializable,
-        EV extends Serializable> extends  GraphValidator<K, VV, EV> {
-
-    /**
-     * Checks that the edge set input contains valid vertex Ids,
-     * i.e. that they also exist in the vertex input set.
-     * @return a singleton DataSet<Boolean> stating whether a graph is valid
-     * with respect to its vertex ids.
-     */
-    @Override
-    public DataSet<Boolean> validate(Graph<K, VV, EV> graph) {
-        DataSet<Tuple1<K>> edgeIds = graph.getEdges().flatMap(new 
MapEdgeIds<K, EV>()).distinct();
-        DataSet<K> invalidIds = 
graph.getVertices().coGroup(edgeIds).where(0).equalTo(0)
-                .with(new GroupInvalidIds<K, VV>()).first(1);
-
-        return GraphUtils.count(invalidIds.map(new KToTupleMap<K>()), 
graph.getContext())
-                .map(new InvalidIdsMap());
-    }
-
-    private static final class MapEdgeIds<K extends Comparable<K> & 
Serializable,
-            EV extends Serializable> implements FlatMapFunction<Edge<K, EV>,
-            Tuple1<K>> {
-
-        @Override
-        public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
-            out.collect(new Tuple1<K>(edge.f0));
-            out.collect(new Tuple1<K>(edge.f1));
-        }
-    }
+public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, 
VV extends Serializable, EV extends Serializable>
+               extends GraphValidator<K, VV, EV> {
 
-    private static final class GroupInvalidIds<K extends Comparable<K> & 
Serializable,
-            VV extends Serializable> implements CoGroupFunction<Vertex<K, VV>, 
Tuple1<K>, K> {
+       /**
+        * Checks that the edge set input contains valid vertex Ids, i.e. that 
they
+        * also exist in the vertex input set.
+        * 
+        * @return a singleton DataSet<Boolean> stating whether a graph is valid
+        *         with respect to its vertex ids.
+        */
+       @Override
+       public DataSet<Boolean> validate(Graph<K, VV, EV> graph) {
+               DataSet<Tuple1<K>> edgeIds = graph.getEdges()
+                               .flatMap(new MapEdgeIds<K, EV>()).distinct();
+               DataSet<K> invalidIds = 
graph.getVertices().coGroup(edgeIds).where(0)
+                               .equalTo(0).with(new GroupInvalidIds<K, 
VV>()).first(1);
 
-        @Override
-        public void coGroup(Iterable<Vertex<K, VV>> vertexId,
-                            Iterable<Tuple1<K>> edgeId, Collector<K> out) {
-            if (!(vertexId.iterator().hasNext())) {
-                // found an id that doesn't exist in the vertex set
-                out.collect(edgeId.iterator().next().f0);
-            }
-        }
-    }
+               return GraphUtils.count(invalidIds.map(new KToTupleMap<K>()),
+                               graph.getContext()).map(new InvalidIdsMap());
+       }
 
-    private static final class KToTupleMap<K> implements MapFunction<K, 
Tuple1<K>> {
+       private static final class MapEdgeIds<K extends Comparable<K> & 
Serializable, EV extends Serializable>
+                       implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
+               public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) 
{
+                       out.collect(new Tuple1<K>(edge.f0));
+                       out.collect(new Tuple1<K>(edge.f1));
+               }
+       }
 
-        @Override
-        public Tuple1<K> map (K key)throws Exception {
-            return new Tuple1<>(key);
-        }
-    }
+       private static final class GroupInvalidIds<K extends Comparable<K> & 
Serializable, VV extends Serializable>
+                       implements CoGroupFunction<Vertex<K, VV>, Tuple1<K>, K> 
{
+               public void coGroup(Iterable<Vertex<K, VV>> vertexId,
+                               Iterable<Tuple1<K>> edgeId, Collector<K> out) {
+                       if (!(vertexId.iterator().hasNext())) {
+                               // found an id that doesn't exist in the vertex 
set
+                               out.collect(edgeId.iterator().next().f0);
+                       }
+               }
+       }
 
-    private static final class InvalidIdsMap implements MapFunction<Integer, 
Boolean> {
+       private static final class KToTupleMap<K> implements MapFunction<K, 
Tuple1<K>> {
+               public Tuple1<K> map(K key) throws Exception {
+                       return new Tuple1<K>(key);
+               }
+       }
 
-        @Override
-        public Boolean map (Integer numberOfInvalidIds)throws Exception {
-            return numberOfInvalidIds == 0;
-        }
-    }
+       private static final class InvalidIdsMap implements     
MapFunction<Integer, Boolean> {
+               public Boolean map(Integer numberOfInvalidIds) throws Exception 
{
+                       return numberOfInvalidIds == 0;
+               }
+       }
 
 }
\ No newline at end of file

Reply via email to