http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
index 866f334..12047e7 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
@@ -41,7 +41,7 @@ import org.apache.flink.types.NullValue;
  * 
  * The result is a DataSet of vertices, where the vertex value corresponds to 
the assigned component ID.
  * 
- * @see org.apache.flink.graph.library.GSAConnectedComponents
+ * @see GSAConnectedComponents
  */
 @SuppressWarnings("serial")
 public class ConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, 
DataSet<Vertex<K, Long>>> {

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
index a44ba14..0354da4 100755
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
@@ -34,7 +34,7 @@ import org.apache.flink.types.NullValue;
  * This implementation assumes that the vertices of the input Graph are 
initialized with unique, Long component IDs.
  * The result is a DataSet of vertices, where the vertex value corresponds to 
the assigned component ID.
  * 
- * @see org.apache.flink.graph.library.ConnectedComponents
+ * @see ConnectedComponents
  */
 public class GSAConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, 
EV, DataSet<Vertex<K, Long>>> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 0c5080d..29183e9 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -116,4 +116,4 @@ public class SingleSourceShortestPaths<K> implements 
GraphAlgorithm<K, Double, D
                        }
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
index c6bba4c..3842e6c 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -353,4 +353,4 @@ public class TriangleEnumerator<K extends Comparable<K>, 
VV, EV> implements
                        this.setField(vertex3, V3);
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java
new file mode 100644
index 0000000..2f619a6
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.test.testdata.ConnectedComponentsData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.types.NullValue;
+
+import java.io.BufferedReader;
+
+@SuppressWarnings("serial")
+public class ConnectedComponentsWithRandomisedEdgesITCase extends 
JavaProgramTestBase {
+
+       private static final long SEED = 9487520347802987L;
+
+       private static final int NUM_VERTICES = 1000;
+
+       private static final int NUM_EDGES = 10000;
+
+       private String resultPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               resultPath = getTempFilePath("results");
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
+               DataSet<String> edgeString = 
env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, 
NUM_VERTICES, SEED).split("\n"));
+
+               DataSet<Edge<Long, NullValue>> edges = edgeString.map(new 
EdgeParser());
+
+               DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new 
IdAssigner());
+
+               Graph<Long, Long, NullValue> graph = 
Graph.fromDataSet(initialVertices, edges, env);
+
+               DataSet<Vertex<Long, Long>> result = graph.run(new 
ConnectedComponents<Long, NullValue>(100));
+
+               result.writeAsCsv(resultPath, "\n", " ");
+               env.execute();
+       }
+
+       /**
+        * A map function that takes a Long value and creates a 2-tuple out of 
it:
+        * <pre>(Long value) -> (value, value)</pre>
+        */
+       public static final class IdAssigner implements MapFunction<Long, 
Vertex<Long, Long>> {
+               @Override
+               public Vertex<Long, Long> map(Long value) {
+                       return new Vertex<>(value, value);
+               }
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               for (BufferedReader reader : getResultReader(resultPath)) {
+                       ConnectedComponentsData.checkOddEvenResult(reader);
+               }
+       }
+
+       public static final class EdgeParser extends RichMapFunction<String, 
Edge<Long, NullValue>> {
+               public Edge<Long, NullValue> map(String value) {
+                       String[] nums = value.split(" ");
+                       return new Edge<>(Long.parseLong(nums[0]), 
Long.parseLong(nums[1]),
+                                       NullValue.getInstance());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
deleted file mode 100755
index 039a05c..0000000
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
-import org.apache.flink.graph.library.GSAConnectedComponents;
-import org.apache.flink.graph.library.GSASingleSourceShortestPaths;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class GatherSumApplyITCase extends MultipleProgramsTestBase {
-
-       public GatherSumApplyITCase(TestExecutionMode mode){
-               super(mode);
-       }
-
-       private String expectedResult;
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Connected Components Test
-       // 
--------------------------------------------------------------------------------------------
-
-       @Test
-       public void testConnectedComponents() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
-                               
ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
-                               new InitMapperCC(), env);
-
-        List<Vertex<Long, Long>> result = inputGraph.run(
-                       new GSAConnectedComponents<Long, 
NullValue>(16)).collect();
-
-               expectedResult = "1,1\n" +
-                               "2,1\n" +
-                               "3,1\n" +
-                               "4,1\n";
-
-               compareResultAsTuples(result, expectedResult);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Single Source Shortest Path Test
-       // 
--------------------------------------------------------------------------------------------
-
-       @Test
-       public void testSingleSourceShortestPaths() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
-                               
SingleSourceShortestPathsData.getDefaultEdgeDataSet(env),
-                               new InitMapperSSSP(), env);
-
-        List<Vertex<Long, Double>> result = inputGraph.run(
-                       new GSASingleSourceShortestPaths<>(1L, 16)).collect();
-
-               expectedResult = "1,0.0\n" +
-                               "2,12.0\n" +
-                               "3,13.0\n" +
-                               "4,47.0\n" +
-                               "5,48.0\n";
-
-               compareResultAsTuples(result, expectedResult);
-       }
-
-       @SuppressWarnings("serial")
-       private static final class InitMapperCC implements MapFunction<Long, 
Long> {
-               public Long map(Long value) {
-                       return value;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class InitMapperSSSP implements MapFunction<Long, 
Double> {
-               public Double map(Long value) {
-                       return 0.0;
-               }
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java
deleted file mode 100644
index b0bacc4..0000000
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.ConnectedComponents;
-import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class ConnectedComponentsITCase extends MultipleProgramsTestBase {
-
-       private String edgesPath;
-
-       private String resultPath;
-
-       private String expected;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       public ConnectedComponentsITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       @Before
-       public void before() throws Exception {
-               resultPath = tempFolder.newFile().toURI().toString();
-
-               File edgesFile = tempFolder.newFile();
-               Files.write(ConnectedComponentsDefaultData.EDGES, edgesFile, 
Charsets.UTF_8);
-               edgesPath = edgesFile.toURI().toString();
-       }
-
-       @Test
-       public void testConnectedComponentsExample() throws Exception {
-               ConnectedComponents.main(new String[]{edgesPath, resultPath, 
ConnectedComponentsDefaultData.MAX_ITERATIONS + ""});
-               expected = ConnectedComponentsDefaultData.VERTICES_WITH_MIN_ID;
-       }
-
-       @After
-       public void after() throws Exception {
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java
deleted file mode 100644
index 183c429..0000000
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.EuclideanGraphWeighing;
-import org.apache.flink.graph.example.utils.EuclideanGraphData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class EuclideanGraphWeighingITCase extends MultipleProgramsTestBase {
-
-       private String verticesPath;
-
-       private String edgesPath;
-
-       private String resultPath;
-
-       private String expected;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       public EuclideanGraphWeighingITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       @Before
-       public void before() throws Exception {
-               resultPath = tempFolder.newFile().toURI().toString();
-               File verticesFile = tempFolder.newFile();
-               Files.write(EuclideanGraphData.VERTICES, verticesFile, 
Charsets.UTF_8);
-
-               File edgesFile = tempFolder.newFile();
-               Files.write(EuclideanGraphData.EDGES, edgesFile, 
Charsets.UTF_8);
-
-               verticesPath = verticesFile.toURI().toString();
-               edgesPath = edgesFile.toURI().toString();
-       }
-
-       @Test
-       public void testGraphWeightingWeighing() throws Exception {
-               EuclideanGraphWeighing.main(new String[]{verticesPath, 
edgesPath, resultPath});
-               expected = EuclideanGraphData.RESULTED_WEIGHTED_EDGES;
-       }
-
-       @After
-       public void after() throws Exception {
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
deleted file mode 100644
index b4cdfd5..0000000
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.IncrementalSSSP;
-import org.apache.flink.graph.example.utils.IncrementalSSSPData;
-import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class IncrementalSSSPITCase extends MultipleProgramsTestBase {
-
-       private String verticesPath;
-
-       private String edgesPath;
-
-       private String edgesInSSSPPath;
-
-       private String resultPath;
-
-       private String expected;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       public IncrementalSSSPITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       @Before
-       public void before() throws Exception {
-               resultPath = tempFolder.newFile().toURI().toString();
-               File verticesFile = tempFolder.newFile();
-               Files.write(IncrementalSSSPData.VERTICES, verticesFile, 
Charsets.UTF_8);
-
-               File edgesFile = tempFolder.newFile();
-               Files.write(IncrementalSSSPData.EDGES, edgesFile, 
Charsets.UTF_8);
-
-               File edgesInSSSPFile = tempFolder.newFile();
-               Files.write(IncrementalSSSPData.EDGES_IN_SSSP, edgesInSSSPFile, 
Charsets.UTF_8);
-
-               verticesPath = verticesFile.toURI().toString();
-               edgesPath = edgesFile.toURI().toString();
-               edgesInSSSPPath = edgesInSSSPFile.toURI().toString();
-       }
-
-       @Test
-       public void testIncrementalSSSP() throws Exception {
-               IncrementalSSSP.main(new String[]{verticesPath, edgesPath, 
edgesInSSSPPath,
-                               IncrementalSSSPData.SRC_EDGE_TO_BE_REMOVED, 
IncrementalSSSPData.TRG_EDGE_TO_BE_REMOVED,
-                               
IncrementalSSSPData.VAL_EDGE_TO_BE_REMOVED,resultPath, 
IncrementalSSSPData.NUM_VERTICES + ""});
-               expected = IncrementalSSSPData.RESULTED_VERTICES;
-       }
-
-       @Test
-       public void testIncrementalSSSPNonSPEdge() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Vertex<Long, Double>> vertices = 
IncrementalSSSPData.getDefaultVertexDataSet(env);
-               DataSet<Edge<Long, Double>> edges = 
IncrementalSSSPData.getDefaultEdgeDataSet(env);
-               DataSet<Edge<Long, Double>> edgesInSSSP = 
IncrementalSSSPData.getDefaultEdgesInSSSP(env);
-               // the edge to be removed is a non-SP edge
-               Edge<Long, Double> edgeToBeRemoved = new Edge<>(3L, 5L, 5.0);
-
-               Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, 
edges, env);
-               // Assumption: all minimum weight paths are kept
-               Graph<Long, Double, Double> ssspGraph = 
Graph.fromDataSet(vertices, edgesInSSSP, env);
-               // remove the edge
-               graph.removeEdge(edgeToBeRemoved);
-
-               // configure the iteration
-               ScatterGatherConfiguration parameters = new 
ScatterGatherConfiguration();
-
-               if(IncrementalSSSP.isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
-
-                       parameters.setDirection(EdgeDirection.IN);
-                       parameters.setOptDegrees(true);
-
-                       // run the scatter gather iteration to propagate info
-                       Graph<Long, Double, Double> result = 
ssspGraph.runScatterGatherIteration(
-                                       new 
IncrementalSSSP.VertexDistanceUpdater(),
-                                       new 
IncrementalSSSP.InvalidateMessenger(edgeToBeRemoved),
-                                       IncrementalSSSPData.NUM_VERTICES, 
parameters);
-
-                       DataSet<Vertex<Long, Double>> resultedVertices = 
result.getVertices();
-
-                       resultedVertices.writeAsCsv(resultPath, "\n", ",");
-                       env.execute();
-               } else {
-                       vertices.writeAsCsv(resultPath, "\n", ",");
-                       env.execute();
-               }
-
-               expected = IncrementalSSSPData.VERTICES;
-       }
-
-       @After
-       public void after() throws Exception {
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java
deleted file mode 100644
index 294a756..0000000
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.JaccardSimilarityMeasure;
-import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class JaccardSimilarityMeasureITCase extends MultipleProgramsTestBase {
-
-       private String edgesPath;
-
-       private String resultPath;
-
-       private String expected;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       public JaccardSimilarityMeasureITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       @Before
-       public void before() throws Exception {
-               resultPath = tempFolder.newFile().toURI().toString();
-
-               File edgesFile = tempFolder.newFile();
-               Files.write(JaccardSimilarityMeasureData.EDGES, edgesFile, 
Charsets.UTF_8);
-
-               edgesPath = edgesFile.toURI().toString();
-       }
-
-       @Test
-       public void testJaccardSimilarityMeasureExample() throws Exception {
-               JaccardSimilarityMeasure.main(new String[]{edgesPath, 
resultPath});
-               expected = JaccardSimilarityMeasureData.JACCARD_EDGES;
-       }
-
-       @After
-       public void after() throws Exception {
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
deleted file mode 100644
index 8152885..0000000
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-
-import org.apache.flink.graph.example.MusicProfiles;
-import org.apache.flink.graph.example.utils.MusicProfilesData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-@RunWith(Parameterized.class)
-public class MusicProfilesITCase extends MultipleProgramsTestBase {
-
-       private String tripletsPath;
-
-       private String mismatchesPath;
-
-       private String topSongsResultPath;
-
-       private String communitiesResultPath;
-
-       private String expectedTopSongs;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       public MusicProfilesITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       @Before
-       public void before() throws Exception {
-               topSongsResultPath = tempFolder.newFile().toURI().toString();
-               communitiesResultPath = tempFolder.newFile().toURI().toString();
-
-               File tripletsFile = tempFolder.newFile();
-               Files.write(MusicProfilesData.USER_SONG_TRIPLETS, tripletsFile, 
Charsets.UTF_8);
-               tripletsPath = tripletsFile.toURI().toString();
-
-               File mismatchesFile = tempFolder.newFile();
-               Files.write(MusicProfilesData.MISMATCHES, mismatchesFile, 
Charsets.UTF_8);
-               mismatchesPath = mismatchesFile.toURI().toString();
-       }
-
-       @Test
-       public void testMusicProfilesExample() throws Exception {
-               MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, 
topSongsResultPath, "0", communitiesResultPath,
-                               MusicProfilesData.MAX_ITERATIONS + ""});
-               expectedTopSongs = MusicProfilesData.TOP_SONGS_RESULT;
-       }
-
-       @After
-       public void after() throws Exception {
-               compareResultsByLinesInMemory(expectedTopSongs, 
topSongsResultPath);
-
-               ArrayList<String> list = new ArrayList<>();
-               readAllResultLines(list, communitiesResultPath, new String[]{}, 
false);
-
-               String[] result = list.toArray(new String[list.size()]);
-               Arrays.sort(result);
-
-               // check that user_1 and user_2 are in the same community
-               Assert.assertEquals("users 1 and 2 are not in the same 
community",
-                               result[0].substring(7), result[1].substring(7));
-
-               // check that user_3, user_4 and user_5 are in the same 
community
-               Assert.assertEquals("users 3 and 4 are not in the same 
community",
-                               result[2].substring(7), result[3].substring(7));
-               Assert.assertEquals("users 4 and 5 are not in the same 
community",
-                               result[3].substring(7), result[4].substring(7));
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
deleted file mode 100644
index d8f8c8f..0000000
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-
-import org.apache.flink.graph.example.GSASingleSourceShortestPaths;
-import org.apache.flink.graph.example.SingleSourceShortestPaths;
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class SingleSourceShortestPathsITCase extends MultipleProgramsTestBase {
-
-    private String edgesPath;
-
-    private String resultPath;
-
-    private String expected;
-
-    @Rule
-    public TemporaryFolder tempFolder = new TemporaryFolder();
-
-    public SingleSourceShortestPathsITCase(TestExecutionMode mode) {
-        super(mode);
-    }
-
-    @Before
-    public void before() throws Exception {
-        resultPath = tempFolder.newFile().toURI().toString();
-
-        File edgesFile = tempFolder.newFile();
-        Files.write(SingleSourceShortestPathsData.EDGES, edgesFile, 
Charsets.UTF_8);
-        edgesPath = edgesFile.toURI().toString();
-    }
-
-    @Test
-    public void testSSSPExample() throws Exception {
-        SingleSourceShortestPaths.main(new 
String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
-                edgesPath, resultPath, 10 + ""});
-        expected = 
SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
-    }
-
-    @Test
-    public void testGSASSSPExample() throws Exception {
-        GSASingleSourceShortestPaths.main(new 
String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
-                edgesPath, resultPath, 10 + ""});
-        expected = 
SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
-    }
-
-    @After
-    public void after() throws Exception {
-        compareResultsByLinesInMemory(expected, resultPath);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
deleted file mode 100644
index 421eaa9..0000000
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.CommunityDetectionData;
-import org.apache.flink.graph.library.CommunityDetection;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class CommunityDetectionITCase extends MultipleProgramsTestBase {
-
-       public CommunityDetectionITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       private String expected;
-
-       @Test
-       public void testSingleIteration() throws Exception {
-               /*
-                * Test one iteration of the Simple Community Detection Example
-                */
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
-                               
CommunityDetectionData.getSimpleEdgeDataSet(env), new InitLabels(), env);
-
-        List<Vertex<Long, Long>> result = inputGraph.run(new 
CommunityDetection<Long>(1, CommunityDetectionData.DELTA))
-                       .getVertices().collect();
-
-               expected = CommunityDetectionData.COMMUNITIES_SINGLE_ITERATION;
-               compareResultAsTuples(result, expected);
-       }
-
-       @Test
-       public void testTieBreaker() throws Exception {
-               /*
-                * Test one iteration of the Simple Community Detection Example 
where a tie must be broken
-                */
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
-                               CommunityDetectionData.getTieEdgeDataSet(env), 
new InitLabels(), env);
-
-        List<Vertex<Long, Long>> result = inputGraph.run(new 
CommunityDetection<Long>(1, CommunityDetectionData.DELTA))
-                       .getVertices().collect();
-               expected = CommunityDetectionData.COMMUNITIES_WITH_TIE;
-               compareResultAsTuples(result, expected);
-       }
-
-       @SuppressWarnings("serial")
-       private static final class InitLabels implements MapFunction<Long, 
Long>{
-
-               public Long map(Long id) {
-                       return id;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
deleted file mode 100644
index c8d85f0..0000000
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.library.ConnectedComponents;
-import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.types.NullValue;
-
-import java.io.BufferedReader;
-
-@SuppressWarnings("serial")
-public class ConnectedComponentsWithRandomisedEdgesITCase extends 
JavaProgramTestBase {
-
-       private static final long SEED = 9487520347802987L;
-
-       private static final int NUM_VERTICES = 1000;
-
-       private static final int NUM_EDGES = 10000;
-
-       private String resultPath;
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempFilePath("results");
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
-               DataSet<String> edgeString = 
env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, 
NUM_VERTICES, SEED).split("\n"));
-
-               DataSet<Edge<Long, NullValue>> edges = edgeString.map(new 
EdgeParser());
-
-               DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new 
IdAssigner());
-
-               Graph<Long, Long, NullValue> graph = 
Graph.fromDataSet(initialVertices, edges, env);
-
-               DataSet<Vertex<Long, Long>> result = graph.run(new 
ConnectedComponents<Long, NullValue>(100));
-
-               result.writeAsCsv(resultPath, "\n", " ");
-               env.execute();
-       }
-
-       /**
-        * A map function that takes a Long value and creates a 2-tuple out of 
it:
-        * <pre>(Long value) -> (value, value)</pre>
-        */
-       public static final class IdAssigner implements MapFunction<Long, 
Vertex<Long, Long>> {
-               @Override
-               public Vertex<Long, Long> map(Long value) {
-                       return new Vertex<>(value, value);
-               }
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               for (BufferedReader reader : getResultReader(resultPath)) {
-                       ConnectedComponentsData.checkOddEvenResult(reader);
-               }
-       }
-
-       public static final class EdgeParser extends RichMapFunction<String, 
Edge<Long, NullValue>> {
-               public Edge<Long, NullValue> map(String value) {
-                       String[] nums = value.split(" ");
-                       return new Edge<>(Long.parseLong(nums[0]), 
Long.parseLong(nums[1]),
-                                       NullValue.getInstance());
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
deleted file mode 100644
index 520269b..0000000
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.library;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.LabelPropagationData;
-import org.apache.flink.graph.library.LabelPropagation;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class LabelPropagationITCase extends MultipleProgramsTestBase {
-
-       public LabelPropagationITCase(TestExecutionMode mode){
-               super(mode);
-       }
-
-    private String expectedResult;
-
-       @Test
-       public void testSingleIteration() throws Exception {
-               /*
-                * Test one iteration of label propagation example with a 
simple graph
-                */
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
-                               LabelPropagationData.getDefaultVertexSet(env),
-                               
LabelPropagationData.getDefaultEdgeDataSet(env), env);
-
-        List<Vertex<Long, Long>> result = inputGraph
-                       .run(new LabelPropagation<Long, Long, NullValue>(1))
-                       .collect();
-
-               expectedResult = LabelPropagationData.LABELS_AFTER_1_ITERATION;
-               compareResultAsTuples(result, expectedResult);
-       }
-
-       @Test
-       public void testTieBreaker() throws Exception {
-               /*
-                * Test the label propagation example where a tie must be broken
-                */
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
-                               LabelPropagationData.getTieVertexSet(env),
-                               LabelPropagationData.getTieEdgeDataSet(env), 
env);
-
-        List<Vertex<Long, Long>> result = inputGraph
-                       .run(new LabelPropagation<Long, Long, NullValue>(1))
-                       .collect();
-
-               expectedResult = LabelPropagationData.LABELS_WITH_TIE;
-               compareResultAsTuples(result, expectedResult);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
deleted file mode 100644
index 431ab70..0000000
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.library;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.PageRankData;
-import org.apache.flink.graph.library.GSAPageRank;
-import org.apache.flink.graph.library.PageRank;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class PageRankITCase extends MultipleProgramsTestBase {
-
-       public PageRankITCase(TestExecutionMode mode){
-               super(mode);
-       }
-
-       @Test
-       public void testPageRankWithThreeIterations() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
-                               PageRankData.getDefaultEdgeDataSet(env), new 
InitMapper(), env);
-
-        List<Vertex<Long, Double>> result = inputGraph.run(new 
PageRank<Long>(0.85, 3))
-                       .collect();
-        
-        compareWithDelta(result, 0.01);
-       }
-
-       @Test
-       public void testGSAPageRankWithThreeIterations() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
-                               PageRankData.getDefaultEdgeDataSet(env), new 
InitMapper(), env);
-
-        List<Vertex<Long, Double>> result = inputGraph.run(new 
GSAPageRank<Long>(0.85, 3))
-                       .collect();
-        
-        compareWithDelta(result, 0.01);
-       }
-
-       @Test
-       public void testPageRankWithThreeIterationsAndNumOfVertices() throws 
Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
-                               PageRankData.getDefaultEdgeDataSet(env), new 
InitMapper(), env);
-
-        List<Vertex<Long, Double>> result = inputGraph.run(new 
PageRank<Long>(0.85, 5, 3))
-                       .collect();
-        
-        compareWithDelta(result, 0.01);
-       }
-
-       @Test
-       public void testGSAPageRankWithThreeIterationsAndNumOfVertices() throws 
Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
-                               PageRankData.getDefaultEdgeDataSet(env), new 
InitMapper(), env);
-
-        List<Vertex<Long, Double>> result = inputGraph.run(new 
GSAPageRank<Long>(0.85, 5, 3))
-                       .collect();
-        
-        compareWithDelta(result, 0.01);
-       }
-
-       private void compareWithDelta(List<Vertex<Long, Double>> result,
-                                                                               
                                                double delta) {
-
-               String resultString = "";
-        for (Vertex<Long, Double> v : result) {
-               resultString += v.f0.toString() + "," + v.f1.toString() +"\n";
-        }
-
-               String expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS;
-               String[] expected = expectedResult.isEmpty() ? new String[0] : 
expectedResult.split("\n");
-
-               String[] resultArray = resultString.isEmpty() ? new String[0] : 
resultString.split("\n");
-
-               Arrays.sort(expected);
-        Arrays.sort(resultArray);
-
-               for (int i = 0; i < expected.length; i++) {
-                       String[] expectedFields = expected[i].split(",");
-                       String[] resultFields = resultArray[i].split(",");
-
-                       double expectedPayLoad = 
Double.parseDouble(expectedFields[1]);
-                       double resultPayLoad = 
Double.parseDouble(resultFields[1]);
-
-                       Assert.assertTrue("Values differ by more than the 
permissible delta",
-                                       Math.abs(expectedPayLoad - 
resultPayLoad) < delta);
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class InitMapper implements MapFunction<Long, 
Double> {
-               public Double map(Long value) {
-                       return 1.0;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java
deleted file mode 100644
index abb4511..0000000
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.library;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.SummarizationData;
-import org.apache.flink.graph.library.Summarization;
-import org.apache.flink.graph.library.Summarization.EdgeValue;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(Parameterized.class)
-public class SummarizationITCase extends MultipleProgramsTestBase {
-
-       private static final Pattern TOKEN_SEPARATOR = Pattern.compile(";");
-
-       private static final Pattern ID_SEPARATOR = Pattern.compile(",");
-
-       public SummarizationITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       @Test
-       public void testWithVertexAndEdgeValues() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, String, String> input = Graph.fromDataSet(
-                               SummarizationData.getVertices(env),
-                               SummarizationData.getEdges(env),
-                               env
-               );
-
-               List<Vertex<Long, Summarization.VertexValue<String>>> 
summarizedVertices = Lists.newArrayList();
-               List<Edge<Long, EdgeValue<String>>> summarizedEdges = 
Lists.newArrayList();
-
-               Graph<Long, Summarization.VertexValue<String>, 
EdgeValue<String>> output =
-                               input.run(new Summarization<Long, String, 
String>());
-
-               output.getVertices().output(new 
LocalCollectionOutputFormat<>(summarizedVertices));
-               output.getEdges().output(new 
LocalCollectionOutputFormat<>(summarizedEdges));
-
-               env.execute();
-
-               validateVertices(SummarizationData.EXPECTED_VERTICES, 
summarizedVertices);
-               validateEdges(SummarizationData.EXPECTED_EDGES_WITH_VALUES, 
summarizedEdges);
-       }
-
-       @Test
-       public void testWithVertexAndAbsentEdgeValues() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Graph<Long, String, NullValue> input = Graph.fromDataSet(
-                               SummarizationData.getVertices(env),
-                               SummarizationData.getEdgesWithAbsentValues(env),
-                               env
-               );
-
-               List<Vertex<Long, Summarization.VertexValue<String>>> 
summarizedVertices = Lists.newArrayList();
-               List<Edge<Long, EdgeValue<NullValue>>> summarizedEdges = 
Lists.newArrayList();
-
-               Graph<Long, Summarization.VertexValue<String>, 
EdgeValue<NullValue>> output =
-                               input.run(new Summarization<Long, String, 
NullValue>());
-
-               output.getVertices().output(new 
LocalCollectionOutputFormat<>(summarizedVertices));
-               output.getEdges().output(new 
LocalCollectionOutputFormat<>(summarizedEdges));
-
-               env.execute();
-
-               validateVertices(SummarizationData.EXPECTED_VERTICES, 
summarizedVertices);
-               validateEdges(SummarizationData.EXPECTED_EDGES_ABSENT_VALUES, 
summarizedEdges);
-       }
-
-       private void validateVertices(String[] expectedVertices,
-                                                                               
                                                List<Vertex<Long, 
Summarization.VertexValue<String>>> actualVertices) {
-               Arrays.sort(expectedVertices);
-               Collections.sort(actualVertices, new Comparator<Vertex<Long, 
Summarization.VertexValue<String>>>() {
-                       @Override
-                       public int compare(Vertex<Long, 
Summarization.VertexValue<String>> o1,
-                                                                               
                 Vertex<Long, Summarization.VertexValue<String>> o2) {
-                               int result = o1.getId().compareTo(o2.getId());
-                               if (result == 0) {
-                                       result = 
o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
-                               }
-                               if (result == 0) {
-                                       result = 
o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
-                               }
-                               if (result == 0) {
-                                       result = 
o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
-                               }
-                               return result;
-                       }
-               });
-
-               for (int i = 0; i < expectedVertices.length; i++) {
-                       validateVertex(expectedVertices[i], 
actualVertices.get(i));
-               }
-       }
-
-       private <EV extends Comparable<EV>> void validateEdges(String[] 
expectedEdges,
-                                                                               
                                 List<Edge<Long, EdgeValue<EV>>> actualEdges) {
-               Arrays.sort(expectedEdges);
-               Collections.sort(actualEdges, new Comparator<Edge<Long, 
EdgeValue<EV>>> () {
-
-                       @Override
-                       public int compare(Edge<Long, EdgeValue<EV>> o1, 
Edge<Long, EdgeValue<EV>> o2) {
-                               int result = 
o1.getSource().compareTo(o2.getSource());
-                               if (result == 0) {
-                                       result = 
o1.getTarget().compareTo(o2.getTarget());
-                               }
-                               if (result == 0) {
-                                       result = 
o1.getTarget().compareTo(o2.getTarget());
-                               }
-                               if (result == 0) {
-                                       result = 
o1.getValue().getEdgeGroupValue().compareTo(o2.getValue().getEdgeGroupValue());
-                               }
-                               if (result == 0) {
-                                       result = 
o1.getValue().getEdgeGroupCount().compareTo(o2.getValue().getEdgeGroupCount());
-                               }
-                               return result;
-                       }
-               });
-
-               for (int i = 0; i < expectedEdges.length; i++) {
-                       validateEdge(expectedEdges[i], actualEdges.get(i));
-               }
-       }
-
-       private void validateVertex(String expected, Vertex<Long, 
Summarization.VertexValue<String>> actual) {
-               String[] tokens = TOKEN_SEPARATOR.split(expected);
-               
assertTrue(getListFromIdRange(tokens[0]).contains(actual.getId()));
-               assertEquals(getGroupValue(tokens[1]), 
actual.getValue().getVertexGroupValue());
-               assertEquals(getGroupCount(tokens[1]), 
actual.getValue().getVertexGroupCount());
-       }
-
-       private <EV> void validateEdge(String expected, Edge<Long, 
EdgeValue<EV>> actual) {
-               String[] tokens = TOKEN_SEPARATOR.split(expected);
-               
assertTrue(getListFromIdRange(tokens[0]).contains(actual.getSource()));
-               
assertTrue(getListFromIdRange(tokens[1]).contains(actual.getTarget()));
-               assertEquals(getGroupValue(tokens[2]), 
actual.getValue().getEdgeGroupValue().toString());
-               assertEquals(getGroupCount(tokens[2]), 
actual.getValue().getEdgeGroupCount());
-       }
-
-       private List<Long> getListFromIdRange(String idRange) {
-               List<Long> result = Lists.newArrayList();
-               for (String id : ID_SEPARATOR.split(idRange)) {
-                       result.add(Long.parseLong(id));
-               }
-               return result;
-       }
-
-       private String getGroupValue(String token) {
-               return ID_SEPARATOR.split(token)[0];
-       }
-
-       private Long getGroupCount(String token) {
-               return Long.valueOf(ID_SEPARATOR.split(token)[1]);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
deleted file mode 100644
index 15f59fe..0000000
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.library;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.example.utils.TriangleCountData;
-import org.apache.flink.graph.library.GSATriangleCount;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class TriangleCountITCase extends MultipleProgramsTestBase {
-
-       public TriangleCountITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       @Test
-       public void testGSATriangleCount() throws Exception {
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, NullValue, NullValue> graph = 
Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
-                               env).getUndirected();
-
-               List<Integer> numberOfTriangles = graph.run(new 
GSATriangleCount<Long, NullValue, NullValue>()).collect();
-               String expectedResult = 
TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES;
-
-               Assert.assertEquals(numberOfTriangles.get(0).intValue(), 
Integer.parseInt(expectedResult));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
deleted file mode 100644
index d06ba30..0000000
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.library;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.example.utils.TriangleCountData;
-import org.apache.flink.graph.library.TriangleEnumerator;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class TriangleEnumeratorITCase extends MultipleProgramsTestBase {
-
-       public TriangleEnumeratorITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       @Test
-       public void testTriangleEnumerator() throws Exception   {
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Graph<Long, NullValue, NullValue> graph = 
Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
-                               env);
-
-               List<Tuple3<Long,Long,Long>> actualOutput = graph.run(new 
TriangleEnumerator<Long, NullValue, NullValue>()).collect();
-               List<Tuple3<Long,Long,Long>>  expectedResult = 
TriangleCountData.getListOfTriangles();
-
-               Assert.assertEquals(actualOutput.size(), expectedResult.size());
-               for(Tuple3<Long,Long,Long> resultTriangle:actualOutput) {
-                       
Assert.assertTrue(expectedResult.indexOf(resultTriangle)>=0);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/pom.xml b/flink-libraries/pom.xml
index 34153c9..64ad7e4 100644
--- a/flink-libraries/pom.xml
+++ b/flink-libraries/pom.xml
@@ -36,6 +36,7 @@ under the License.
        <modules>
                <module>flink-gelly</module>
                <module>flink-gelly-scala</module>
+               <module>flink-gelly-examples</module>
                <module>flink-python</module>
                <module>flink-table</module>
                <module>flink-ml</module>

Reply via email to