http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/EuclideanGraphData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/EuclideanGraphData.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/EuclideanGraphData.java
new file mode 100644
index 0000000..2b4277d
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/EuclideanGraphData.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.EuclideanGraphWeighing;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the Euclidean Graph example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class EuclideanGraphData {
+
+       public static final int NUM_VERTICES = 9;
+
+       public static final String VERTICES = "1,1.0,1.0\n" + "2,2.0,2.0\n" + 
"3,3.0,3.0\n" + "4,4.0,4.0\n" + "5,5.0,5.0\n" +
+                       "6,6.0,6.0\n" + "7,7.0,7.0\n" + "8,8.0,8.0\n" + 
"9,9.0,9.0";
+
+       public static DataSet<Vertex<Long, EuclideanGraphWeighing.Point>> 
getDefaultVertexDataSet(ExecutionEnvironment env) {
+
+               List<Vertex<Long, EuclideanGraphWeighing.Point>> vertices = new 
ArrayList<Vertex<Long, EuclideanGraphWeighing.Point>>();
+               for(int i=1; i<=NUM_VERTICES; i++) {
+                       vertices.add(new Vertex<Long, 
EuclideanGraphWeighing.Point>(new Long(i),
+                                       new EuclideanGraphWeighing.Point(new 
Double(i), new Double(i))));
+               }
+
+               return env.fromCollection(vertices);
+       }
+
+       public static final String EDGES = "1,2\n" + "1,4\n" + "2,3\n" + 
"2,4\n" + "2,5\n" +
+                       "3,5\n" + "4,5\n" + "4,6\n" + "5,7\n" + "5,9\n" + 
"6,7\n" + "6,8\n" +
+                       "7,8\n" + "7,9\n" +  "8,9";
+
+       public static DataSet<Edge<Long, Double>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
+               edges.add(new Edge<Long, Double>(1L, 2L, 0.0));
+               edges.add(new Edge<Long, Double>(1L, 4L, 0.0));
+               edges.add(new Edge<Long, Double>(2L, 3L, 0.0));
+               edges.add(new Edge<Long, Double>(2L, 4L, 0.0));
+               edges.add(new Edge<Long, Double>(2L, 5L, 0.0));
+               edges.add(new Edge<Long, Double>(3L, 5L, 0.0));
+               edges.add(new Edge<Long, Double>(4L, 5L, 0.0));
+               edges.add(new Edge<Long, Double>(4L, 6L, 0.0));
+               edges.add(new Edge<Long, Double>(5L, 7L, 0.0));
+               edges.add(new Edge<Long, Double>(5L, 9L, 0.0));
+               edges.add(new Edge<Long, Double>(6L, 7L, 0.0));
+               edges.add(new Edge<Long, Double>(6L, 8L, 0.0));
+               edges.add(new Edge<Long, Double>(6L, 8L, 0.0));
+               edges.add(new Edge<Long, Double>(7L, 8L, 0.0));
+               edges.add(new Edge<Long, Double>(7L, 9L, 0.0));
+               edges.add(new Edge<Long, Double>(8L, 9L, 0.0));
+
+               return env.fromCollection(edges);
+       }
+
+       public static final String RESULTED_WEIGHTED_EDGES = 
"1,2,1.4142135623730951\n" + "1,4,4.242640687119285\n" +
+                       "2,3,1.4142135623730951\n" + "2,4,2.8284271247461903\n" 
+ "2,5,4.242640687119285\n" + "3,5,2.8284271247461903\n" +
+                       "4,5,1.4142135623730951\n" + "4,6,2.8284271247461903\n" 
+ "5,7,2.8284271247461903\n" + "5,9,5.656854249492381\n" +
+                       "6,7,1.4142135623730951\n" + "6,8,2.8284271247461903\n" 
+ "7,8,1.4142135623730951\n" + "7,9,2.8284271247461903\n" +
+                       "8,9,1.4142135623730951";
+
+       private EuclideanGraphData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/IncrementalSSSPData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/IncrementalSSSPData.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/IncrementalSSSPData.java
new file mode 100644
index 0000000..99e363a
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/IncrementalSSSPData.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the IncrementalSSSP example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class IncrementalSSSPData {
+
+       public static final int NUM_VERTICES = 5;
+
+       public static final String VERTICES = "1,6.0\n" + "2,2.0\n" + "3,3.0\n" 
+ "4,1.0\n" + "5,0.0";
+
+       public static DataSet<Vertex<Long, Double>> 
getDefaultVertexDataSet(ExecutionEnvironment env) {
+
+               List<Vertex<Long, Double>> vertices = new 
ArrayList<Vertex<Long, Double>>();
+               vertices.add(new Vertex<Long, Double>(1L, 6.0));
+               vertices.add(new Vertex<Long, Double>(2L, 2.0));
+               vertices.add(new Vertex<Long, Double>(3L, 3.0));
+               vertices.add(new Vertex<Long, Double>(4L, 1.0));
+               vertices.add(new Vertex<Long, Double>(5L, 0.0));
+
+               return env.fromCollection(vertices);
+       }
+
+       public static final String EDGES = "1,3,3.0\n" + "2,4,3.0\n" + 
"2,5,2.0\n" + "3,2,1.0\n" + "3,5,5.0\n" +
+                       "4,5,1.0";
+
+       public static final DataSet<Edge<Long, Double>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
+               edges.add(new Edge<Long, Double>(1L, 3L, 3.0));
+               edges.add(new Edge<Long, Double>(2L, 4L, 3.0));
+               edges.add(new Edge<Long, Double>(2L, 5L, 2.0));
+               edges.add(new Edge<Long, Double>(3L, 2L, 1.0));
+               edges.add(new Edge<Long, Double>(3L, 5L, 5.0));
+               edges.add(new Edge<Long, Double>(4L, 5L, 1.0));
+
+               return env.fromCollection(edges);
+       }
+
+       public static final String EDGES_IN_SSSP = "1,3,3.0\n" + "2,5,2.0\n" + 
"3,2,1.0\n" + "4,5,1.0";
+
+       public static final DataSet<Edge<Long, Double>> 
getDefaultEdgesInSSSP(ExecutionEnvironment env) {
+
+               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
+               edges.add(new Edge<Long, Double>(1L, 3L, 3.0));
+               edges.add(new Edge<Long, Double>(2L, 5L, 2.0));
+               edges.add(new Edge<Long, Double>(3L, 2L, 1.0));
+               edges.add(new Edge<Long, Double>(4L, 5L, 1.0));
+
+               return env.fromCollection(edges);
+       }
+
+       public static final String SRC_EDGE_TO_BE_REMOVED = "2";
+
+       public static final String TRG_EDGE_TO_BE_REMOVED = "5";
+
+       public static final String VAL_EDGE_TO_BE_REMOVED = "2.0";
+
+       public static final Edge<Long, Double> getDefaultEdgeToBeRemoved() {
+
+               return new Edge<Long, Double>(2L, 5L, 2.0);
+       }
+
+       public static final String RESULTED_VERTICES = "1," + Double.MAX_VALUE 
+ "\n" + "2," + Double.MAX_VALUE+ "\n"
+                       + "3," + Double.MAX_VALUE + "\n" + "4,1.0\n" + "5,0.0";
+
+       private IncrementalSSSPData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/JaccardSimilarityMeasureData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/JaccardSimilarityMeasureData.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/JaccardSimilarityMeasureData.java
new file mode 100644
index 0000000..054f041
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/JaccardSimilarityMeasureData.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the Jaccard Similarity Measure 
example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class JaccardSimilarityMeasureData {
+
+       public static final String EDGES = "1   2\n" + "1       3\n" + "1       
4\n" + "1       5\n" + "2       3\n" + "2       4\n" +
+                       "2      5\n" + "3       4\n" + "3       5\n" + "4       
5";
+
+       public static DataSet<Edge<Long, Double>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
+               edges.add(new Edge<Long, Double>(1L, 2L, new Double(0)));
+               edges.add(new Edge<Long, Double>(1L, 3L, new Double(0)));
+               edges.add(new Edge<Long, Double>(1L, 4L, new Double(0)));
+               edges.add(new Edge<Long, Double>(1L, 5L, new Double(0)));
+               edges.add(new Edge<Long, Double>(2L, 3L, new Double(0)));
+               edges.add(new Edge<Long, Double>(2L, 4L, new Double(0)));
+               edges.add(new Edge<Long, Double>(2L, 5L, new Double(0)));
+               edges.add(new Edge<Long, Double>(3L, 4L, new Double(0)));
+               edges.add(new Edge<Long, Double>(3L, 5L, new Double(0)));
+               edges.add(new Edge<Long, Double>(4L, 5L, new Double(0)));
+
+               return env.fromCollection(edges);
+       }
+
+       public static final String JACCARD_EDGES = "1,2,0.6\n" + "1,3,0.6\n" + 
"1,4,0.6\n" + "1,5,0.6\n" +
+                       "2,3,0.6\n" + "2,4,0.6\n" + "2,5,0.6\n" + "3,4,0.6\n" + 
"3,5,0.6\n" + "4,5,0.6";
+
+       private JaccardSimilarityMeasureData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/LabelPropagationData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/LabelPropagationData.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/LabelPropagationData.java
new file mode 100644
index 0000000..8decb24
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/LabelPropagationData.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+
+/**
+ * Provides the default data set used for the Label Propagation test program.
+ * If no parameters are given to the program, the default edge data set is 
used.
+ */
+public class LabelPropagationData {
+       
+       public static final String LABELS_AFTER_1_ITERATION = "1,10\n" +
+                       "2,10\n" +
+                       "3,10\n" +
+                       "4,40\n" +
+                       "5,40\n" +
+                       "6,40\n" +
+                       "7,40\n";
+
+       public static final String LABELS_WITH_TIE ="1,10\n" +
+                       "2,10\n" +
+                       "3,10\n" +
+                       "4,10\n" +
+                       "5,20\n" +
+                       "6,20\n" +
+                       "7,20\n" +
+                       "8,20\n" +
+                       "9,20\n";
+
+       private LabelPropagationData() {}
+
+       public static final DataSet<Vertex<Long, Long>> 
getDefaultVertexSet(ExecutionEnvironment env) {
+
+               List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, 
Long>>();
+               vertices.add(new Vertex<Long, Long>(1l, 10l));
+               vertices.add(new Vertex<Long, Long>(2l, 10l));
+               vertices.add(new Vertex<Long, Long>(3l, 30l));
+               vertices.add(new Vertex<Long, Long>(4l, 40l));
+               vertices.add(new Vertex<Long, Long>(5l, 40l));
+               vertices.add(new Vertex<Long, Long>(6l, 40l));
+               vertices.add(new Vertex<Long, Long>(7l, 40l));
+
+               return env.fromCollection(vertices);
+       }
+
+       public static final DataSet<Edge<Long, NullValue>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+               List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, 
NullValue>>();
+               edges.add(new Edge<Long, NullValue>(1L, 3L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(2L, 3L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(4L, 7L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(5L, 7L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(6L, 7L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(7L, 3L, 
NullValue.getInstance()));
+
+               return env.fromCollection(edges);
+       }
+
+       public static final DataSet<Vertex<Long, Long>> 
getTieVertexSet(ExecutionEnvironment env) {
+
+               List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, 
Long>>();
+               vertices.add(new Vertex<Long, Long>(1l, 10l));
+               vertices.add(new Vertex<Long, Long>(2l, 10l));
+               vertices.add(new Vertex<Long, Long>(3l, 10l));
+               vertices.add(new Vertex<Long, Long>(4l, 10l));
+               vertices.add(new Vertex<Long, Long>(5l, 0l));
+               vertices.add(new Vertex<Long, Long>(6l, 20l));
+               vertices.add(new Vertex<Long, Long>(7l, 20l));
+               vertices.add(new Vertex<Long, Long>(8l, 20l));
+               vertices.add(new Vertex<Long, Long>(9l, 20l));
+
+               return env.fromCollection(vertices);
+       }
+
+       public static final DataSet<Edge<Long, NullValue>> 
getTieEdgeDataSet(ExecutionEnvironment env) {
+
+               List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, 
NullValue>>();
+               edges.add(new Edge<Long, NullValue>(1L, 5L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(2L, 5L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(4L, 5L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(5L, 5L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(6L, 5L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(7L, 5L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(8L, 5L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(9L, 5L, 
NullValue.getInstance()));
+
+               return env.fromCollection(edges);
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/MusicProfilesData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/MusicProfilesData.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/MusicProfilesData.java
new file mode 100644
index 0000000..e4c98fe
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/MusicProfilesData.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+/**
+ * Provides the default data sets used for the Music Profiles example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class MusicProfilesData {
+
+       public static DataSet<Tuple3<String, String, Integer>> 
getUserSongTriplets(ExecutionEnvironment env) {
+               List<Tuple3<String, String, Integer>> triplets = new 
ArrayList<Tuple3<String, String, Integer>>();
+               
+               triplets.add(new Tuple3<String, String, Integer>("user_1", 
"song_1", 100));
+               triplets.add(new Tuple3<String, String, Integer>("user_1", 
"song_2", 10));
+               triplets.add(new Tuple3<String, String, Integer>("user_1", 
"song_3", 20));
+               triplets.add(new Tuple3<String, String, Integer>("user_1", 
"song_4", 30));
+               triplets.add(new Tuple3<String, String, Integer>("user_1", 
"song_5", 1));
+               
+               triplets.add(new Tuple3<String, String, Integer>("user_2", 
"song_6", 40));
+               triplets.add(new Tuple3<String, String, Integer>("user_2", 
"song_7", 10));
+               triplets.add(new Tuple3<String, String, Integer>("user_2", 
"song_8", 3));
+               
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_1", 100));
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_2", 10));
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_3", 20));
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_8", 30));
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_9", 1));
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_10", 8));
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_11", 90));
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_12", 30));
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_13", 34));
+               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_14", 17));
+               
+               triplets.add(new Tuple3<String, String, Integer>("user_4", 
"song_1", 100));
+               triplets.add(new Tuple3<String, String, Integer>("user_4", 
"song_6", 10));
+               triplets.add(new Tuple3<String, String, Integer>("user_4", 
"song_8", 20));
+               triplets.add(new Tuple3<String, String, Integer>("user_4", 
"song_12", 30));
+               triplets.add(new Tuple3<String, String, Integer>("user_4", 
"song_13", 1));
+               triplets.add(new Tuple3<String, String, Integer>("user_4", 
"song_15", 1));
+               
+               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_3", 300));
+               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_4", 4));
+               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_5", 5));
+               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_8", 8));
+               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_9", 9));
+               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_10", 10));
+               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_12", 12));
+               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_13", 13));
+               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_15", 15));
+
+               triplets.add(new Tuple3<String, String, Integer>("user_6", 
"song_6", 30));
+
+               return env.fromCollection(triplets);
+       }
+       
+       public static DataSet<String> getMismatches(ExecutionEnvironment env) {
+               List<String> errors = new ArrayList<String>();
+               errors.add("ERROR: <song_8 track_8> Sever");
+               errors.add("ERROR: <song_15 track_15> Black Trees");
+               return env.fromCollection(errors);
+       }
+
+       public static final String USER_SONG_TRIPLETS = "user_1 song_1  100\n" 
+ "user_1        song_5  200\n"
+                       + "user_2       song_1  10\n" + "user_2 song_4  20\n"
+                       + "user_3       song_2  3\n"
+                       + "user_4       song_2  1\n" + "user_4  song_3  2\n"
+                       + "user_5       song_3  30";
+
+       public static final String MISMATCHES = "ERROR: <song_5 track_8> Angie";
+
+       public static final String MAX_ITERATIONS = "2";
+
+       public static final String TOP_SONGS_RESULT = "user_1   song_1\n" +
+                                                               "user_2 
song_4\n" +
+                                                               "user_3 
song_2\n" +
+                                                               "user_4 
song_3\n" +
+                                                               "user_5 song_3";
+
+       public static final String COMMUNITIES_RESULT = "user_1 1\n" +
+                                                               "user_2 1\n" +
+                                                               "user_3 3\n" +
+                                                               "user_4 3\n" +
+                                                               "user_5 4";
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/PageRankData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/PageRankData.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/PageRankData.java
new file mode 100644
index 0000000..a45de88
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/PageRankData.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
+/**
+ * Provides the default data set used for the PageRank test program.
+ * If no parameters are given to the program, the default edge data set is 
used.
+ */
+public class PageRankData {
+       
+       public static final String EDGES = "2   1\n" +
+                                                                               
"5      2\n" + 
+                                                                               
"5      4\n" +
+                                                                               
"4      3\n" +
+                                                                               
"4      2\n" +
+                                                                               
"1      4\n" +
+                                                                               
"1      2\n" +
+                                                                               
"1      3\n" +
+                                                                               
"3      5\n";
+
+       
+       public static final String RANKS_AFTER_3_ITERATIONS = "1,0.237\n" +
+                                                                               
                                "2,0.248\n" + 
+                                                                               
                                "3,0.173\n" +
+                                                                               
                                "4,0.175\n" +
+                                                                               
                                "5,0.165\n";
+
+       private PageRankData() {}
+
+       public static final DataSet<Edge<Long, Double>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
+               edges.add(new Edge<Long, Double>(2L, 1L, 1.0));
+               edges.add(new Edge<Long, Double>(5L, 2L, 1.0));
+               edges.add(new Edge<Long, Double>(5L, 4L, 1.0));
+               edges.add(new Edge<Long, Double>(4L, 3L, 1.0));
+               edges.add(new Edge<Long, Double>(4L, 2L, 1.0));
+               edges.add(new Edge<Long, Double>(1L, 4L, 1.0));
+               edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+               edges.add(new Edge<Long, Double>(1L, 3L, 1.0));
+               edges.add(new Edge<Long, Double>(3L, 5L, 1.0));
+
+               return env.fromCollection(edges);
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SingleSourceShortestPathsData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SingleSourceShortestPathsData.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SingleSourceShortestPathsData.java
new file mode 100644
index 0000000..75b4484
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SingleSourceShortestPathsData.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
+/**
+ * Provides the default data set used for the Single Source Shortest Paths 
example program.
+ * If no parameters are given to the program, the default edge data set is 
used.
+ */
+public class SingleSourceShortestPathsData {
+
+       public static final Long SRC_VERTEX_ID = 1L;
+
+       public static final String EDGES = "1\t2\t12.0\n" + "1\t3\t13.0\n" + 
"2\t3\t23.0\n" + "3\t4\t34.0\n" + "3\t5\t35.0\n" +
+                                       "4\t5\t45.0\n" + "5\t1\t51.0";
+
+       public static final Object[][] DEFAULT_EDGES = new Object[][] {
+               new Object[]{1L, 2L, 12.0},
+               new Object[]{1L, 3L, 13.0},
+               new Object[]{2L, 3L, 23.0},
+               new Object[]{3L, 4L, 34.0},
+               new Object[]{3L, 5L, 35.0},
+               new Object[]{4L, 5L, 45.0},
+               new Object[]{5L, 1L, 51.0}
+       };
+
+       public static final String RESULTED_SINGLE_SOURCE_SHORTEST_PATHS =  
"1,0.0\n" + "2,12.0\n" + "3,13.0\n" + 
+                                                               "4,47.0\n" + 
"5,48.0";
+
+       public static DataSet<Edge<Long, Double>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
+               
+               List<Edge<Long, Double>> edgeList = new LinkedList<Edge<Long, 
Double>>();
+               for (Object[] edge : DEFAULT_EDGES) {
+                       edgeList.add(new Edge<Long, Double>((Long) edge[0], 
(Long) edge[1], (Double) edge[2]));
+               }
+               return env.fromCollection(edgeList);
+       }
+
+       private SingleSourceShortestPathsData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java
new file mode 100644
index 0000000..c14d5de
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import com.google.common.collect.Lists;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+
+import java.util.List;
+
+/**
+ * Provides the default data set used for Summarization tests.
+ */
+public class SummarizationData {
+
+       private SummarizationData() {}
+
+       /**
+        * The resulting vertex id can be any id of the vertices summarized by 
the single vertex.
+        *
+        * Format:
+        *
+        * "possible-id[,possible-id];group-value,group-count"
+        */
+       public static final String[] EXPECTED_VERTICES = new String[] {
+                       "0,1;A,2",
+                       "2,3,4;B,3",
+                       "5;C,1"
+       };
+
+       /**
+        * Format:
+        *
+        * 
"possible-source-id[,possible-source-id];possible-target-id[,possible-target-id];group-value,group-count"
+        */
+       public static final String[] EXPECTED_EDGES_WITH_VALUES = new String[] {
+                       "0,1;0,1;A,2",
+                       "0,1;2,3,4;A,1",
+                       "2,3,4;0,1;A,1",
+                       "2,3,4;0,1;C,2",
+                       "2,3,4;2,3,4;B,2",
+                       "5;2,3,4;D,2"
+       };
+
+       /**
+        * Format:
+        *
+        * 
"possible-source-id[,possible-source-id];possible-target-id[,possible-target-id];group-value,group-count"
+        */
+       public static final String[] EXPECTED_EDGES_ABSENT_VALUES = new 
String[] {
+                       "0,1;0,1;(null),2",
+                       "0,1;2,3,4;(null),1",
+                       "2,3,4;0,1;(null),3",
+                       "2,3,4;2,3,4;(null),2",
+                       "5;2,3,4;(null),2"
+       };
+
+       /**
+        * Creates a set of vertices with attached {@link String} values.
+        *
+        * @param env execution environment
+        * @return vertex data set with string values
+        */
+       public static DataSet<Vertex<Long, String>> 
getVertices(ExecutionEnvironment env) {
+               List<Vertex<Long, String>> vertices = 
Lists.newArrayListWithExpectedSize(6);
+               vertices.add(new Vertex<>(0L, "A"));
+               vertices.add(new Vertex<>(1L, "A"));
+               vertices.add(new Vertex<>(2L, "B"));
+               vertices.add(new Vertex<>(3L, "B"));
+               vertices.add(new Vertex<>(4L, "B"));
+               vertices.add(new Vertex<>(5L, "C"));
+
+               return env.fromCollection(vertices);
+       }
+
+       /**
+        * Creates a set of edges with attached {@link String} values.
+        *
+        * @param env execution environment
+        * @return edge data set with string values
+        */
+       public static DataSet<Edge<Long, String>> getEdges(ExecutionEnvironment 
env) {
+               List<Edge<Long, String>> edges = 
Lists.newArrayListWithExpectedSize(10);
+               edges.add(new Edge<>(0L, 1L, "A"));
+               edges.add(new Edge<>(1L, 0L, "A"));
+               edges.add(new Edge<>(1L, 2L, "A"));
+               edges.add(new Edge<>(2L, 1L, "A"));
+               edges.add(new Edge<>(2L, 3L, "B"));
+               edges.add(new Edge<>(3L, 2L, "B"));
+               edges.add(new Edge<>(4L, 0L, "C"));
+               edges.add(new Edge<>(4L, 1L, "C"));
+               edges.add(new Edge<>(5L, 2L, "D"));
+               edges.add(new Edge<>(5L, 3L, "D"));
+
+               return env.fromCollection(edges);
+       }
+
+       /**
+        * Creates a set of edges with {@link NullValue} as edge value.
+        *
+        * @param env execution environment
+        * @return edge data set with null values
+        */
+       @SuppressWarnings("serial")
+       public static DataSet<Edge<Long, NullValue>> 
getEdgesWithAbsentValues(ExecutionEnvironment env) {
+               return getEdges(env).map(new MapFunction<Edge<Long, String>, 
Edge<Long, NullValue>>() {
+                       @Override
+                       public Edge<Long, NullValue> map(Edge<Long, String> 
value) throws Exception {
+                               return new Edge<>(value.getSource(), 
value.getTarget(), NullValue.getInstance());
+                       }
+               });
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
new file mode 100644
index 0000000..71b874c
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.types.NullValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the Triangle Count test program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class TriangleCountData {
+
+       public static final String EDGES = "1   2\n"+"1 3\n"+"2 3\n"+"2 6\n"+"3 
4\n"+"3 5\n"+"3 6\n"+"4 5\n"+"6 7\n";
+
+       public static DataSet<Edge<Long, NullValue>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+               List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, 
NullValue>>();
+               edges.add(new Edge<Long, NullValue>(1L, 2L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(1L, 3L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(2L, 3L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(2L, 6L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(3L, 4L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(3L, 5L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(3L, 6L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(4L, 5L, 
NullValue.getInstance()));
+               edges.add(new Edge<Long, NullValue>(6L, 7L, 
NullValue.getInstance()));
+
+               return env.fromCollection(edges);
+       }
+
+       public static final String RESULTED_NUMBER_OF_TRIANGLES = "3";
+
+       public static List<Tuple3<Long,Long,Long>> getListOfTriangles() {
+               ArrayList<Tuple3<Long,Long,Long>> ret = new ArrayList<>(3);
+               ret.add(new Tuple3<>(1L,2L,3L));
+               ret.add(new Tuple3<>(2L,3L,6L));
+               ret.add(new Tuple3<>(3L,4L,5L));
+               return ret;
+       }
+
+       private TriangleCountData () {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java
new file mode 100644
index 0000000..b1bc831
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.utils;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+public class ExampleUtils {
+
+       @SuppressWarnings({ "serial", "unchecked", "rawtypes" })
+       public static void printResult(DataSet set, String msg) {
+               set.output(new PrintingOutputFormatWithMessage(msg) {
+               });
+       }
+
+       public static class PrintingOutputFormatWithMessage<T> implements
+                       OutputFormat<T> {
+
+               private static final long serialVersionUID = 1L;
+
+               private transient PrintStream stream;
+
+               private transient String prefix;
+
+               private String message;
+
+               // 
--------------------------------------------------------------------------------------------
+
+               /**
+                * Instantiates a printing output format that prints to 
standard out.
+                */
+               public PrintingOutputFormatWithMessage() {
+               }
+
+               public PrintingOutputFormatWithMessage(String msg) {
+                       this.message = msg;
+               }
+
+               @Override
+               public void open(int taskNumber, int numTasks) {
+                       // get the target stream
+                       this.stream = System.out;
+
+                       // set the prefix to message
+                       this.prefix = message + ": ";
+               }
+
+               @Override
+               public void writeRecord(T record) {
+                       if (this.prefix != null) {
+                               this.stream.println(this.prefix + 
record.toString());
+                       } else {
+                               this.stream.println(record.toString());
+                       }
+               }
+
+               @Override
+               public void close() {
+                       this.stream = null;
+                       this.prefix = null;
+               }
+
+               @Override
+               public String toString() {
+                       return "Print to System.out";
+               }
+
+               @Override
+               public void configure(Configuration parameters) {
+               }
+       }
+
+       @SuppressWarnings("serial")
+       public static DataSet<Vertex<Long, NullValue>> getVertexIds(
+                       ExecutionEnvironment env, final long numVertices) {
+               return env.generateSequence(1, numVertices).map(
+                               new MapFunction<Long, Vertex<Long, 
NullValue>>() {
+                                       public Vertex<Long, NullValue> map(Long 
l) {
+                                               return new Vertex<Long, 
NullValue>(l, NullValue
+                                                               .getInstance());
+                                       }
+                               });
+       }
+
+       @SuppressWarnings("serial")
+       public static DataSet<Edge<Long, NullValue>> getRandomEdges(
+                       ExecutionEnvironment env, final long numVertices) {
+               return env.generateSequence(1, numVertices).flatMap(
+                               new FlatMapFunction<Long, Edge<Long, 
NullValue>>() {
+                                       @Override
+                                       public void flatMap(Long key, 
Collector<Edge<Long, NullValue>> out) throws Exception {
+                                               int numOutEdges = (int) 
(Math.random() * (numVertices / 2));
+                                               for (int i = 0; i < 
numOutEdges; i++) {
+                                                       long target = (long) 
(Math.random() * numVertices) + 1;
+                                                       out.collect(new 
Edge<Long, NullValue>(key, target,
+                                                                       
NullValue.getInstance()));
+                                               }
+                                       }
+                               });
+       }
+
+       public static DataSet<Vertex<Long, Double>> getLongDoubleVertexData(
+                       ExecutionEnvironment env) {
+               List<Vertex<Long, Double>> vertices = new 
ArrayList<Vertex<Long, Double>>();
+               vertices.add(new Vertex<Long, Double>(1L, 1.0));
+               vertices.add(new Vertex<Long, Double>(2L, 2.0));
+               vertices.add(new Vertex<Long, Double>(3L, 3.0));
+               vertices.add(new Vertex<Long, Double>(4L, 4.0));
+               vertices.add(new Vertex<Long, Double>(5L, 5.0));
+
+               return env.fromCollection(vertices);
+       }
+
+       public static DataSet<Edge<Long, Double>> getLongDoubleEdgeData(
+                       ExecutionEnvironment env) {
+               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
+               edges.add(new Edge<Long, Double>(1L, 2L, 12.0));
+               edges.add(new Edge<Long, Double>(1L, 3L, 13.0));
+               edges.add(new Edge<Long, Double>(2L, 3L, 23.0));
+               edges.add(new Edge<Long, Double>(3L, 4L, 34.0));
+               edges.add(new Edge<Long, Double>(3L, 5L, 35.0));
+               edges.add(new Edge<Long, Double>(4L, 5L, 45.0));
+               edges.add(new Edge<Long, Double>(5L, 1L, 51.0));
+
+               return env.fromCollection(edges);
+       }
+
+       /**
+        * Private constructor to prevent instantiation.
+        */
+       private ExampleUtils() {
+               throw new RuntimeException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/ConnectedComponents.scala
 
b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/ConnectedComponents.scala
new file mode 100644
index 0000000..704d476
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/ConnectedComponents.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.examples
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.library.GSAConnectedComponents
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.Edge
+import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData
+import org.apache.flink.types.NullValue
+import org.apache.flink.api.common.functions.MapFunction
+import java.lang.Long
+
+/**
+ * This example shows how to use Gelly's library methods.
+ * You can find all available library methods in 
[[org.apache.flink.graph.library]]. 
+ * 
+ * In particular, this example uses the
+ * [[GSAConnectedComponents]]
+ * library method to compute the connected components of the input graph.
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\n1\t3\n</code> defines two edges,
+ * 1-2 and 1-3.
+ *
+ * Usage {{
+ *   ConnectedComponents <edge path> <result path> <number of iterations>
+ *   }}
+ * If no parameters are provided, the program is run with default data from
+ * [[ConnectedComponentsDefaultData]]
+ */
+object ConnectedComponents {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env)
+    val graph = Graph.fromDataSet[Long, Long, NullValue](edges, new 
InitVertices, env)
+
+    val components = graph.run(new GSAConnectedComponents[Long, 
NullValue](maxIterations))
+
+
+    // emit result
+    if (fileOutput) {
+      components.writeAsCsv(outputPath, "\n", ",")
+      env.execute("Connected Components Example")
+    } else {
+      components.print()
+    }
+  }
+
+  private final class InitVertices extends MapFunction[Long, Long] {
+    override def map(id: Long) = id
+  }
+
+  // ***********************************************************************
+  // UTIL METHODS
+  // ***********************************************************************
+
+    private var fileOutput = false
+    private var edgesInputPath: String = null
+    private var outputPath: String = null
+    private var maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS
+
+    private def parseParameters(args: Array[String]): Boolean = {
+      if(args.length > 0) {
+        if(args.length != 3) {
+          System.err.println("Usage ConnectedComponents <edge path> <output 
path> " +
+            "<num iterations>")
+        }
+        fileOutput = true
+        edgesInputPath = args(0)
+        outputPath = args(1)
+        maxIterations = 2
+      } else {
+        System.out.println("Executing ConnectedComponents example with default 
parameters" +
+          " and built-in default data.")
+        System.out.println("  Provide parameters to read input data from 
files.")
+        System.out.println("  See the documentation for the correct format of 
input files.")
+        System.out.println("Usage ConnectedComponents <edge path> <output 
path> " +
+          "<num iterations>")
+      }
+      true
+    }
+
+    private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, 
NullValue]] = {
+      if (fileOutput) {
+        env.readCsvFile[(Long, Long)](edgesInputPath,
+          lineDelimiter = "\n",
+          fieldDelimiter = "\t")
+          .map(edge => new Edge[Long, NullValue](edge._1, edge._2, 
NullValue.getInstance))
+      } else {
+        val edgeData = ConnectedComponentsDefaultData.DEFAULT_EDGES map {
+          case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
+        }
+        env.fromCollection(edgeData).map(
+        edge => new Edge[Long, NullValue](edge._1, edge._2, 
NullValue.getInstance))
+      }
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GSASingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GSASingleSourceShortestPaths.scala
 
b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GSASingleSourceShortestPaths.scala
new file mode 100644
index 0000000..0a10ad7
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GSASingleSourceShortestPaths.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.examples
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.Edge
+import org.apache.flink.graph.gsa.{ApplyFunction, GatherFunction, Neighbor, 
SumFunction}
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
+import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData
+
+/**
+ * This example shows how to use Gelly's gather-sum-apply iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which 
are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[SingleSourceShortestPathsData]]
+ */
+object GSASingleSourceShortestPaths {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
+    val graph = Graph.fromDataSet[Long, Double, Double](edges, new 
InitVertices(srcVertexId), env)
+
+    // Execute the gather-sum-apply iteration
+    val result = graph.runGatherSumApplyIteration(new CalculateDistances, new 
ChooseMinDistance,
+      new UpdateDistance, maxIterations)
+
+    // Extract the vertices as the result
+    val singleSourceShortestPaths = result.getVertices
+
+    // emit result
+    if (fileOutput) {
+      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
+      env.execute("GSA Single Source Shortest Paths Example")
+    } else {
+      singleSourceShortestPaths.print()
+    }
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //  Single Source Shortest Path UDFs
+  // 
--------------------------------------------------------------------------------------------
+
+  private final class InitVertices(srcId: Long) extends MapFunction[Long, 
Double] {
+
+    override def map(id: Long) = {
+      if (id.equals(srcId)) {
+        0.0
+      } else {
+        Double.PositiveInfinity
+      }
+    }
+  }
+
+  private final class CalculateDistances extends GatherFunction[Double, 
Double, Double] {
+    override def gather(neighbor: Neighbor[Double, Double]) = {
+      neighbor.getNeighborValue + neighbor.getEdgeValue
+    }
+  }
+
+  private final class ChooseMinDistance extends SumFunction[Double, Double, 
Double] {
+    override def sum(newValue: Double, currentValue: Double) = {
+      Math.min(newValue, currentValue)
+    }
+  }
+
+  private final class UpdateDistance extends ApplyFunction[Long, Double, 
Double] {
+    override def apply(newDistance: Double, oldDistance: Double) = {
+      if (newDistance < oldDistance) {
+        setResult(newDistance)
+      }
+    }
+  }
+
+  // **************************************************************************
+  // UTIL METHODS
+  // **************************************************************************
+
+  private var fileOutput = false
+  private var srcVertexId = 1L
+  private var edgesInputPath: String = null
+  private var outputPath: String = null
+  private var maxIterations = 5
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if(args.length > 0) {
+      if(args.length != 4) {
+        System.err.println("Usage: SingleSourceShortestPaths <source vertex 
id>" +
+          " <input edges path> <output path> <num iterations>")
+      }
+      fileOutput = true
+      srcVertexId = args(0).toLong
+      edgesInputPath = args(1)
+      outputPath = args(2)
+      maxIterations = 3
+    } else {
+      System.out.println("Executing Single Source Shortest Paths example "
+        + "with default parameters and built-in default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of 
input files.")
+      System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" 
+
+        " <input edges path> <output path> <num iterations>")
+    }
+    true
+  }
+
+  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, 
Double]] = {
+    if (fileOutput) {
+      env.readCsvFile[(Long, Long, Double)](edgesInputPath,
+        lineDelimiter = "\n",
+        fieldDelimiter = "\t")
+        .map(new Tuple3ToEdgeMap[Long, Double]())
+    } else {
+      val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
+        case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
+          z.asInstanceOf[Double])
+      }
+      env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
 
b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
new file mode 100644
index 0000000..f9fa82d
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.examples
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.Edge
+import org.apache.flink.util.Collector
+
+/**
+ * This example illustrates how to use Gelly metrics methods and get simple 
statistics
+ * from the input graph.  
+ * 
+ * The program creates a random graph and computes and prints
+ * the following metrics:
+ * - number of vertices
+ * - number of edges
+ * - average node degree
+ * - the vertex ids with the max/min in- and out-degrees
+ *
+ * The input file is expected to contain one edge per line,
+ * with long IDs and no values, in the following format:
+ * {{{
+ *   <sourceVertexID>\t<targetVertexID>
+ * }}}
+ * If no arguments are provided, the example runs with a random graph of 100 
vertices.
+ *
+ */
+object GraphMetrics {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    /** create the graph **/
+    val graph: Graph[Long, NullValue, NullValue] = 
Graph.fromDataSet(getEdgeDataSet(env), env)
+
+    /** get the number of vertices **/
+    val numVertices = graph.numberOfVertices
+
+    /** get the number of edges **/
+    val numEdges = graph.numberOfEdges
+
+    /** compute the average node degree **/
+    val verticesWithDegrees = graph.getDegrees
+    val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / 
numVertices).toDouble)
+
+    /** find the vertex with the maximum in-degree **/
+    val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1)
+
+    /** find the vertex with the minimum in-degree **/
+    val minInDegreeVertex = graph.inDegrees.min(1).map(in => in._1)
+
+    /** find the vertex with the maximum out-degree **/
+    val maxOutDegreeVertex = graph.outDegrees.max(1).map(in => in._1)
+
+    /** find the vertex with the minimum out-degree **/
+    val minOutDegreeVertex = graph.outDegrees.min(1).map(in => in._1)
+
+    /** print the results **/
+    env.fromElements(numVertices).printOnTaskManager("Total number of 
vertices")
+    env.fromElements(numEdges).printOnTaskManager("Total number of edges")
+    avgDegree.printOnTaskManager("Average node degree")
+    maxInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
+    minInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
+    maxOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
+    minOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
+
+  }
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      fileOutput = true
+      if (args.length == 1) {
+        edgesPath = args(0)
+        true
+      } else {
+        System.err.println("Usage: GraphMetrics <edges path>")
+        false
+      }
+    } else {
+      System.out.println("Executing GraphMetrics example with built-in default 
data.")
+      System.out.println("  Provide parameters to read input data from a 
file.")
+      System.out.println("  Usage: GraphMetrics <edges path>")
+      true
+    }
+  }
+
+  private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, 
NullValue]] = {
+    if (fileOutput) {
+      env.readCsvFile[(Long, Long)](
+        edgesPath,
+        fieldDelimiter = "\t").map(
+        in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance()))
+    } else {
+      env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]](
+        (key: Long, out: Collector[Edge[Long, NullValue]]) => {
+          val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
+          for ( i <- 0 to numOutEdges ) {
+            val target: Long = ((Math.random() * numVertices) + 1).toLong
+            new Edge[Long, NullValue](key, target, NullValue.getInstance())
+          }
+      })
+    }
+  }
+
+  private var fileOutput: Boolean = false
+  private var edgesPath: String = null
+  private var outputPath: String = null
+  private val numVertices = 100
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala
 
b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala
new file mode 100644
index 0000000..4f84bb0
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.examples
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.Edge
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.spargel.VertexUpdateFunction
+import org.apache.flink.graph.spargel.MessageIterator
+import org.apache.flink.graph.Vertex
+import org.apache.flink.graph.spargel.MessagingFunction
+import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData
+import scala.collection.JavaConversions._
+import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
+
+/**
+ * This example shows how to use Gelly's scatter-gather iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which 
are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[SingleSourceShortestPathsData]]
+ */
+object SingleSourceShortestPaths {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
+    val graph = Graph.fromDataSet[Long, Double, Double](edges, new 
InitVertices(srcVertexId), env)
+
+    // Execute the scatter-gather iteration
+    val result = graph.runScatterGatherIteration(new VertexDistanceUpdater,
+      new MinDistanceMessenger, maxIterations)
+
+    // Extract the vertices as the result
+    val singleSourceShortestPaths = result.getVertices
+
+    // emit result
+    if (fileOutput) {
+      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
+      env.execute("Single Source Shortest Paths Example")
+    } else {
+      singleSourceShortestPaths.print()
+    }
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //  Single Source Shortest Path UDFs
+  // 
--------------------------------------------------------------------------------------------
+
+  private final class InitVertices(srcId: Long) extends MapFunction[Long, 
Double] {
+
+    override def map(id: Long) = {
+      if (id.equals(srcId)) {
+        0.0
+      } else {
+        Double.PositiveInfinity
+      }
+    }
+  }
+
+  /**
+   * Function that updates the value of a vertex by picking the minimum
+   * distance from all incoming messages.
+   */
+  private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, 
Double, Double] {
+
+    override def updateVertex(vertex: Vertex[Long, Double], inMessages: 
MessageIterator[Double]) {
+      var minDistance = Double.MaxValue
+      while (inMessages.hasNext) {
+        val msg = inMessages.next
+        if (msg < minDistance) {
+          minDistance = msg
+        }
+      }
+      if (vertex.getValue > minDistance) {
+        setNewVertexValue(minDistance)
+      }
+    }
+  }
+
+  /**
+   * Distributes the minimum distance associated with a given vertex among all
+   * the target vertices summed up with the edge's value.
+   */
+  private final class MinDistanceMessenger extends
+    MessagingFunction[Long, Double, Double, Double] {
+
+    override def sendMessages(vertex: Vertex[Long, Double]) {
+      if (vertex.getValue < Double.PositiveInfinity) {
+        for (edge: Edge[Long, Double] <- getEdges) {
+          sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
+        }
+      }
+    }
+  }
+
+  // 
****************************************************************************
+  // UTIL METHODS
+  // 
****************************************************************************
+
+  private var fileOutput = false
+  private var srcVertexId = 1L
+  private var edgesInputPath: String = null
+  private var outputPath: String = null
+  private var maxIterations = 5
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if(args.length > 0) {
+      if(args.length != 4) {
+        System.err.println("Usage: SingleSourceShortestPaths <source vertex 
id>" +
+          " <input edges path> <output path> <num iterations>")
+      }
+      fileOutput = true
+      srcVertexId = args(0).toLong
+      edgesInputPath = args(1)
+      outputPath = args(2)
+      maxIterations = 3
+    } else {
+      System.out.println("Executing Single Source Shortest Paths example "
+        + "with default parameters and built-in default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of 
input files.")
+      System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" 
+
+        " <input edges path> <output path> <num iterations>")
+    }
+    true
+  }
+
+  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, 
Double]] = {
+    if (fileOutput) {
+      env.readCsvFile[(Long, Long, Double)](edgesInputPath,
+        lineDelimiter = "\n",
+        fieldDelimiter = "\t")
+        .map(new Tuple3ToEdgeMap[Long, Double]())
+    } else {
+      val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
+        case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
+          z.asInstanceOf[Double])
+      }
+      env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/CommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/CommunityDetectionITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/CommunityDetectionITCase.java
new file mode 100644
index 0000000..cd8af9b
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/CommunityDetectionITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.data.CommunityDetectionData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class CommunityDetectionITCase extends MultipleProgramsTestBase {
+
+       public CommunityDetectionITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       private String expected;
+
+       @Test
+       public void testSingleIteration() throws Exception {
+               /*
+                * Test one iteration of the Simple Community Detection Example
+                */
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
+                               
CommunityDetectionData.getSimpleEdgeDataSet(env), new InitLabels(), env);
+
+        List<Vertex<Long, Long>> result = inputGraph.run(new 
CommunityDetection<Long>(1, CommunityDetectionData.DELTA))
+                       .getVertices().collect();
+
+               expected = CommunityDetectionData.COMMUNITIES_SINGLE_ITERATION;
+               compareResultAsTuples(result, expected);
+       }
+
+       @Test
+       public void testTieBreaker() throws Exception {
+               /*
+                * Test one iteration of the Simple Community Detection Example 
where a tie must be broken
+                */
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
+                               CommunityDetectionData.getTieEdgeDataSet(env), 
new InitLabels(), env);
+
+        List<Vertex<Long, Long>> result = inputGraph.run(new 
CommunityDetection<Long>(1, CommunityDetectionData.DELTA))
+                       .getVertices().collect();
+               expected = CommunityDetectionData.COMMUNITIES_WITH_TIE;
+               compareResultAsTuples(result, expected);
+       }
+
+       @SuppressWarnings("serial")
+       private static final class InitLabels implements MapFunction<Long, 
Long>{
+
+               public Long map(Long id) {
+                       return id;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/LabelPropagationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/LabelPropagationITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/LabelPropagationITCase.java
new file mode 100644
index 0000000..8b9234b
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/LabelPropagationITCase.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.data.LabelPropagationData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class LabelPropagationITCase extends MultipleProgramsTestBase {
+
+       public LabelPropagationITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+    private String expectedResult;
+
+       @Test
+       public void testSingleIteration() throws Exception {
+               /*
+                * Test one iteration of label propagation example with a 
simple graph
+                */
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+                               LabelPropagationData.getDefaultVertexSet(env),
+                               
LabelPropagationData.getDefaultEdgeDataSet(env), env);
+
+        List<Vertex<Long, Long>> result = inputGraph
+                       .run(new LabelPropagation<Long, Long, NullValue>(1))
+                       .collect();
+
+               expectedResult = LabelPropagationData.LABELS_AFTER_1_ITERATION;
+               compareResultAsTuples(result, expectedResult);
+       }
+
+       @Test
+       public void testTieBreaker() throws Exception {
+               /*
+                * Test the label propagation example where a tie must be broken
+                */
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+                               LabelPropagationData.getTieVertexSet(env),
+                               LabelPropagationData.getTieEdgeDataSet(env), 
env);
+
+        List<Vertex<Long, Long>> result = inputGraph
+                       .run(new LabelPropagation<Long, Long, NullValue>(1))
+                       .collect();
+
+               expectedResult = LabelPropagationData.LABELS_WITH_TIE;
+               compareResultAsTuples(result, expectedResult);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java
new file mode 100644
index 0000000..034bcd5
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.data.PageRankData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class PageRankITCase extends MultipleProgramsTestBase {
+
+       public PageRankITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       @Test
+       public void testPageRankWithThreeIterations() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+                               PageRankData.getDefaultEdgeDataSet(env), new 
InitMapper(), env);
+
+        List<Vertex<Long, Double>> result = inputGraph.run(new 
PageRank<Long>(0.85, 3))
+                       .collect();
+        
+        compareWithDelta(result, 0.01);
+       }
+
+       @Test
+       public void testGSAPageRankWithThreeIterations() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+                               PageRankData.getDefaultEdgeDataSet(env), new 
InitMapper(), env);
+
+        List<Vertex<Long, Double>> result = inputGraph.run(new 
GSAPageRank<Long>(0.85, 3))
+                       .collect();
+        
+        compareWithDelta(result, 0.01);
+       }
+
+       @Test
+       public void testPageRankWithThreeIterationsAndNumOfVertices() throws 
Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+                               PageRankData.getDefaultEdgeDataSet(env), new 
InitMapper(), env);
+
+        List<Vertex<Long, Double>> result = inputGraph.run(new 
PageRank<Long>(0.85, 5, 3))
+                       .collect();
+        
+        compareWithDelta(result, 0.01);
+       }
+
+       @Test
+       public void testGSAPageRankWithThreeIterationsAndNumOfVertices() throws 
Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+                               PageRankData.getDefaultEdgeDataSet(env), new 
InitMapper(), env);
+
+        List<Vertex<Long, Double>> result = inputGraph.run(new 
GSAPageRank<Long>(0.85, 5, 3))
+                       .collect();
+        
+        compareWithDelta(result, 0.01);
+       }
+
+       private void compareWithDelta(List<Vertex<Long, Double>> result,
+                                                                               
                                                double delta) {
+
+               String resultString = "";
+        for (Vertex<Long, Double> v : result) {
+               resultString += v.f0.toString() + "," + v.f1.toString() +"\n";
+        }
+
+               String expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS;
+               String[] expected = expectedResult.isEmpty() ? new String[0] : 
expectedResult.split("\n");
+
+               String[] resultArray = resultString.isEmpty() ? new String[0] : 
resultString.split("\n");
+
+               Arrays.sort(expected);
+        Arrays.sort(resultArray);
+
+               for (int i = 0; i < expected.length; i++) {
+                       String[] expectedFields = expected[i].split(",");
+                       String[] resultFields = resultArray[i].split(",");
+
+                       double expectedPayLoad = 
Double.parseDouble(expectedFields[1]);
+                       double resultPayLoad = 
Double.parseDouble(resultFields[1]);
+
+                       Assert.assertTrue("Values differ by more than the 
permissible delta",
+                                       Math.abs(expectedPayLoad - 
resultPayLoad) < delta);
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class InitMapper implements MapFunction<Long, 
Double> {
+               public Double map(Long value) {
+                       return 1.0;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
new file mode 100644
index 0000000..17ddcfa
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.data.SummarizationData;
+import org.apache.flink.graph.library.Summarization.EdgeValue;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class SummarizationITCase extends MultipleProgramsTestBase {
+
+       private static final Pattern TOKEN_SEPARATOR = Pattern.compile(";");
+
+       private static final Pattern ID_SEPARATOR = Pattern.compile(",");
+
+       public SummarizationITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testWithVertexAndEdgeValues() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, String, String> input = Graph.fromDataSet(
+                               SummarizationData.getVertices(env),
+                               SummarizationData.getEdges(env),
+                               env
+               );
+
+               List<Vertex<Long, Summarization.VertexValue<String>>> 
summarizedVertices = Lists.newArrayList();
+               List<Edge<Long, EdgeValue<String>>> summarizedEdges = 
Lists.newArrayList();
+
+               Graph<Long, Summarization.VertexValue<String>, 
EdgeValue<String>> output =
+                               input.run(new Summarization<Long, String, 
String>());
+
+               output.getVertices().output(new 
LocalCollectionOutputFormat<>(summarizedVertices));
+               output.getEdges().output(new 
LocalCollectionOutputFormat<>(summarizedEdges));
+
+               env.execute();
+
+               validateVertices(SummarizationData.EXPECTED_VERTICES, 
summarizedVertices);
+               validateEdges(SummarizationData.EXPECTED_EDGES_WITH_VALUES, 
summarizedEdges);
+       }
+
+       @Test
+       public void testWithVertexAndAbsentEdgeValues() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Graph<Long, String, NullValue> input = Graph.fromDataSet(
+                               SummarizationData.getVertices(env),
+                               SummarizationData.getEdgesWithAbsentValues(env),
+                               env
+               );
+
+               List<Vertex<Long, Summarization.VertexValue<String>>> 
summarizedVertices = Lists.newArrayList();
+               List<Edge<Long, EdgeValue<NullValue>>> summarizedEdges = 
Lists.newArrayList();
+
+               Graph<Long, Summarization.VertexValue<String>, 
EdgeValue<NullValue>> output =
+                               input.run(new Summarization<Long, String, 
NullValue>());
+
+               output.getVertices().output(new 
LocalCollectionOutputFormat<>(summarizedVertices));
+               output.getEdges().output(new 
LocalCollectionOutputFormat<>(summarizedEdges));
+
+               env.execute();
+
+               validateVertices(SummarizationData.EXPECTED_VERTICES, 
summarizedVertices);
+               validateEdges(SummarizationData.EXPECTED_EDGES_ABSENT_VALUES, 
summarizedEdges);
+       }
+
+       private void validateVertices(String[] expectedVertices,
+                                                                               
                                                List<Vertex<Long, 
Summarization.VertexValue<String>>> actualVertices) {
+               Arrays.sort(expectedVertices);
+               Collections.sort(actualVertices, new Comparator<Vertex<Long, 
Summarization.VertexValue<String>>>() {
+                       @Override
+                       public int compare(Vertex<Long, 
Summarization.VertexValue<String>> o1,
+                                                                               
                 Vertex<Long, Summarization.VertexValue<String>> o2) {
+                               int result = o1.getId().compareTo(o2.getId());
+                               if (result == 0) {
+                                       result = 
o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
+                               }
+                               if (result == 0) {
+                                       result = 
o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
+                               }
+                               if (result == 0) {
+                                       result = 
o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
+                               }
+                               return result;
+                       }
+               });
+
+               for (int i = 0; i < expectedVertices.length; i++) {
+                       validateVertex(expectedVertices[i], 
actualVertices.get(i));
+               }
+       }
+
+       private <EV extends Comparable<EV>> void validateEdges(String[] 
expectedEdges,
+                                                                               
                                 List<Edge<Long, EdgeValue<EV>>> actualEdges) {
+               Arrays.sort(expectedEdges);
+               Collections.sort(actualEdges, new Comparator<Edge<Long, 
EdgeValue<EV>>> () {
+
+                       @Override
+                       public int compare(Edge<Long, EdgeValue<EV>> o1, 
Edge<Long, EdgeValue<EV>> o2) {
+                               int result = 
o1.getSource().compareTo(o2.getSource());
+                               if (result == 0) {
+                                       result = 
o1.getTarget().compareTo(o2.getTarget());
+                               }
+                               if (result == 0) {
+                                       result = 
o1.getTarget().compareTo(o2.getTarget());
+                               }
+                               if (result == 0) {
+                                       result = 
o1.getValue().getEdgeGroupValue().compareTo(o2.getValue().getEdgeGroupValue());
+                               }
+                               if (result == 0) {
+                                       result = 
o1.getValue().getEdgeGroupCount().compareTo(o2.getValue().getEdgeGroupCount());
+                               }
+                               return result;
+                       }
+               });
+
+               for (int i = 0; i < expectedEdges.length; i++) {
+                       validateEdge(expectedEdges[i], actualEdges.get(i));
+               }
+       }
+
+       private void validateVertex(String expected, Vertex<Long, 
Summarization.VertexValue<String>> actual) {
+               String[] tokens = TOKEN_SEPARATOR.split(expected);
+               
assertTrue(getListFromIdRange(tokens[0]).contains(actual.getId()));
+               assertEquals(getGroupValue(tokens[1]), 
actual.getValue().getVertexGroupValue());
+               assertEquals(getGroupCount(tokens[1]), 
actual.getValue().getVertexGroupCount());
+       }
+
+       private <EV> void validateEdge(String expected, Edge<Long, 
EdgeValue<EV>> actual) {
+               String[] tokens = TOKEN_SEPARATOR.split(expected);
+               
assertTrue(getListFromIdRange(tokens[0]).contains(actual.getSource()));
+               
assertTrue(getListFromIdRange(tokens[1]).contains(actual.getTarget()));
+               assertEquals(getGroupValue(tokens[2]), 
actual.getValue().getEdgeGroupValue().toString());
+               assertEquals(getGroupCount(tokens[2]), 
actual.getValue().getEdgeGroupCount());
+       }
+
+       private List<Long> getListFromIdRange(String idRange) {
+               List<Long> result = Lists.newArrayList();
+               for (String id : ID_SEPARATOR.split(idRange)) {
+                       result.add(Long.parseLong(id));
+               }
+               return result;
+       }
+
+       private String getGroupValue(String token) {
+               return ID_SEPARATOR.split(token)[0];
+       }
+
+       private Long getGroupCount(String token) {
+               return Long.valueOf(ID_SEPARATOR.split(token)[1]);
+       }
+}

Reply via email to