http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java
new file mode 100644
index 0000000..aaada8f
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.examples.data.TriangleCountData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class TriangleCountITCase extends MultipleProgramsTestBase {
+
+       public TriangleCountITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testGSATriangleCount() throws Exception {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, NullValue, NullValue> graph = 
Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
+                               env).getUndirected();
+
+               List<Integer> numberOfTriangles = graph.run(new 
GSATriangleCount<Long, NullValue, NullValue>()).collect();
+               String expectedResult = 
TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES;
+
+               Assert.assertEquals(numberOfTriangles.get(0).intValue(), 
Integer.parseInt(expectedResult));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
new file mode 100644
index 0000000..56b3289
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.examples.data.TriangleCountData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class TriangleEnumeratorITCase extends MultipleProgramsTestBase {
+
+       public TriangleEnumeratorITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Test
+       public void testTriangleEnumerator() throws Exception   {
+
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, NullValue, NullValue> graph = 
Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
+                               env);
+
+               List<Tuple3<Long,Long,Long>> actualOutput = graph.run(new 
TriangleEnumerator<Long, NullValue, NullValue>()).collect();
+               List<Tuple3<Long,Long,Long>>  expectedResult = 
TriangleCountData.getListOfTriangles();
+
+               Assert.assertEquals(actualOutput.size(), expectedResult.size());
+               for(Tuple3<Long,Long,Long> resultTriangle:actualOutput) {
+                       
Assert.assertTrue(expectedResult.indexOf(resultTriangle)>=0);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
new file mode 100755
index 0000000..7a3d550
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData;
+import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData;
+import org.apache.flink.graph.library.GSAConnectedComponents;
+import org.apache.flink.graph.library.GSASingleSourceShortestPaths;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class GatherSumApplyITCase extends MultipleProgramsTestBase {
+
+       public GatherSumApplyITCase(TestExecutionMode mode){
+               super(mode);
+       }
+
+       private String expectedResult;
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Connected Components Test
+       // 
--------------------------------------------------------------------------------------------
+
+       @Test
+       public void testConnectedComponents() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+                               
ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
+                               new InitMapperCC(), env);
+
+        List<Vertex<Long, Long>> result = inputGraph.run(
+                       new GSAConnectedComponents<Long, 
NullValue>(16)).collect();
+
+               expectedResult = "1,1\n" +
+                               "2,1\n" +
+                               "3,1\n" +
+                               "4,1\n";
+
+               compareResultAsTuples(result, expectedResult);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Single Source Shortest Path Test
+       // 
--------------------------------------------------------------------------------------------
+
+       @Test
+       public void testSingleSourceShortestPaths() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+                               
SingleSourceShortestPathsData.getDefaultEdgeDataSet(env),
+                               new InitMapperSSSP(), env);
+
+        List<Vertex<Long, Double>> result = inputGraph.run(
+                       new GSASingleSourceShortestPaths<>(1L, 16)).collect();
+
+               expectedResult = "1,0.0\n" +
+                               "2,12.0\n" +
+                               "3,13.0\n" +
+                               "4,47.0\n" +
+                               "5,48.0\n";
+
+               compareResultAsTuples(result, expectedResult);
+       }
+
+       @SuppressWarnings("serial")
+       private static final class InitMapperCC implements MapFunction<Long, 
Long> {
+               public Long map(Long value) {
+                       return value;
+               }
+       }
+
+       @SuppressWarnings("serial")
+       private static final class InitMapperSSSP implements MapFunction<Long, 
Double> {
+               public Double map(Long value) {
+                       return 0.0;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java
new file mode 100644
index 0000000..d0de8dc
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.examples;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.examples.ConnectedComponents;
+import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class ConnectedComponentsITCase extends MultipleProgramsTestBase {
+
+       private String edgesPath;
+
+       private String resultPath;
+
+       private String expected;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       public ConnectedComponentsITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Before
+       public void before() throws Exception {
+               resultPath = tempFolder.newFile().toURI().toString();
+
+               File edgesFile = tempFolder.newFile();
+               Files.write(ConnectedComponentsDefaultData.EDGES, edgesFile, 
Charsets.UTF_8);
+               edgesPath = edgesFile.toURI().toString();
+       }
+
+       @Test
+       public void testConnectedComponentsExample() throws Exception {
+               ConnectedComponents.main(new String[]{edgesPath, resultPath, 
ConnectedComponentsDefaultData.MAX_ITERATIONS + ""});
+               expected = ConnectedComponentsDefaultData.VERTICES_WITH_MIN_ID;
+       }
+
+       @After
+       public void after() throws Exception {
+               TestBaseUtils.compareResultsByLinesInMemory(expected, 
resultPath);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/EuclideanGraphWeighingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/EuclideanGraphWeighingITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/EuclideanGraphWeighingITCase.java
new file mode 100644
index 0000000..922c4b2
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/EuclideanGraphWeighingITCase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.examples;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.examples.EuclideanGraphWeighing;
+import org.apache.flink.graph.examples.data.EuclideanGraphData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class EuclideanGraphWeighingITCase extends MultipleProgramsTestBase {
+
+       private String verticesPath;
+
+       private String edgesPath;
+
+       private String resultPath;
+
+       private String expected;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       public EuclideanGraphWeighingITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Before
+       public void before() throws Exception {
+               resultPath = tempFolder.newFile().toURI().toString();
+               File verticesFile = tempFolder.newFile();
+               Files.write(EuclideanGraphData.VERTICES, verticesFile, 
Charsets.UTF_8);
+
+               File edgesFile = tempFolder.newFile();
+               Files.write(EuclideanGraphData.EDGES, edgesFile, 
Charsets.UTF_8);
+
+               verticesPath = verticesFile.toURI().toString();
+               edgesPath = edgesFile.toURI().toString();
+       }
+
+       @Test
+       public void testGraphWeightingWeighing() throws Exception {
+               EuclideanGraphWeighing.main(new String[]{verticesPath, 
edgesPath, resultPath});
+               expected = EuclideanGraphData.RESULTED_WEIGHTED_EDGES;
+       }
+
+       @After
+       public void after() throws Exception {
+               TestBaseUtils.compareResultsByLinesInMemory(expected, 
resultPath);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
new file mode 100644
index 0000000..d27dcd8
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.examples;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.IncrementalSSSP;
+import org.apache.flink.graph.examples.data.IncrementalSSSPData;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class IncrementalSSSPITCase extends MultipleProgramsTestBase {
+
+       private String verticesPath;
+
+       private String edgesPath;
+
+       private String edgesInSSSPPath;
+
+       private String resultPath;
+
+       private String expected;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       public IncrementalSSSPITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Before
+       public void before() throws Exception {
+               resultPath = tempFolder.newFile().toURI().toString();
+               File verticesFile = tempFolder.newFile();
+               Files.write(IncrementalSSSPData.VERTICES, verticesFile, 
Charsets.UTF_8);
+
+               File edgesFile = tempFolder.newFile();
+               Files.write(IncrementalSSSPData.EDGES, edgesFile, 
Charsets.UTF_8);
+
+               File edgesInSSSPFile = tempFolder.newFile();
+               Files.write(IncrementalSSSPData.EDGES_IN_SSSP, edgesInSSSPFile, 
Charsets.UTF_8);
+
+               verticesPath = verticesFile.toURI().toString();
+               edgesPath = edgesFile.toURI().toString();
+               edgesInSSSPPath = edgesInSSSPFile.toURI().toString();
+       }
+
+       @Test
+       public void testIncrementalSSSP() throws Exception {
+               IncrementalSSSP.main(new String[]{verticesPath, edgesPath, 
edgesInSSSPPath,
+                               IncrementalSSSPData.SRC_EDGE_TO_BE_REMOVED, 
IncrementalSSSPData.TRG_EDGE_TO_BE_REMOVED,
+                               
IncrementalSSSPData.VAL_EDGE_TO_BE_REMOVED,resultPath, 
IncrementalSSSPData.NUM_VERTICES + ""});
+               expected = IncrementalSSSPData.RESULTED_VERTICES;
+       }
+
+       @Test
+       public void testIncrementalSSSPNonSPEdge() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Vertex<Long, Double>> vertices = 
IncrementalSSSPData.getDefaultVertexDataSet(env);
+               DataSet<Edge<Long, Double>> edges = 
IncrementalSSSPData.getDefaultEdgeDataSet(env);
+               DataSet<Edge<Long, Double>> edgesInSSSP = 
IncrementalSSSPData.getDefaultEdgesInSSSP(env);
+               // the edge to be removed is a non-SP edge
+               Edge<Long, Double> edgeToBeRemoved = new Edge<>(3L, 5L, 5.0);
+
+               Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, 
edges, env);
+               // Assumption: all minimum weight paths are kept
+               Graph<Long, Double, Double> ssspGraph = 
Graph.fromDataSet(vertices, edgesInSSSP, env);
+               // remove the edge
+               graph.removeEdge(edgeToBeRemoved);
+
+               // configure the iteration
+               ScatterGatherConfiguration parameters = new 
ScatterGatherConfiguration();
+
+               if(IncrementalSSSP.isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
+
+                       parameters.setDirection(EdgeDirection.IN);
+                       parameters.setOptDegrees(true);
+
+                       // run the scatter gather iteration to propagate info
+                       Graph<Long, Double, Double> result = 
ssspGraph.runScatterGatherIteration(
+                                       new 
IncrementalSSSP.VertexDistanceUpdater(),
+                                       new 
IncrementalSSSP.InvalidateMessenger(edgeToBeRemoved),
+                                       IncrementalSSSPData.NUM_VERTICES, 
parameters);
+
+                       DataSet<Vertex<Long, Double>> resultedVertices = 
result.getVertices();
+
+                       resultedVertices.writeAsCsv(resultPath, "\n", ",");
+                       env.execute();
+               } else {
+                       vertices.writeAsCsv(resultPath, "\n", ",");
+                       env.execute();
+               }
+
+               expected = IncrementalSSSPData.VERTICES;
+       }
+
+       @After
+       public void after() throws Exception {
+               TestBaseUtils.compareResultsByLinesInMemory(expected, 
resultPath);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/JaccardSimilarityMeasureITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/JaccardSimilarityMeasureITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/JaccardSimilarityMeasureITCase.java
new file mode 100644
index 0000000..92cca86
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/JaccardSimilarityMeasureITCase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.examples;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.examples.JaccardSimilarityMeasure;
+import org.apache.flink.graph.examples.data.JaccardSimilarityMeasureData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class JaccardSimilarityMeasureITCase extends MultipleProgramsTestBase {
+
+       private String edgesPath;
+
+       private String resultPath;
+
+       private String expected;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       public JaccardSimilarityMeasureITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Before
+       public void before() throws Exception {
+               resultPath = tempFolder.newFile().toURI().toString();
+
+               File edgesFile = tempFolder.newFile();
+               Files.write(JaccardSimilarityMeasureData.EDGES, edgesFile, 
Charsets.UTF_8);
+
+               edgesPath = edgesFile.toURI().toString();
+       }
+
+       @Test
+       public void testJaccardSimilarityMeasureExample() throws Exception {
+               JaccardSimilarityMeasure.main(new String[]{edgesPath, 
resultPath});
+               expected = JaccardSimilarityMeasureData.JACCARD_EDGES;
+       }
+
+       @After
+       public void after() throws Exception {
+               TestBaseUtils.compareResultsByLinesInMemory(expected, 
resultPath);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/MusicProfilesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/MusicProfilesITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/MusicProfilesITCase.java
new file mode 100644
index 0000000..d76a3ec
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/MusicProfilesITCase.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.examples;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+import org.apache.flink.graph.examples.MusicProfiles;
+import org.apache.flink.graph.examples.data.MusicProfilesData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+@RunWith(Parameterized.class)
+public class MusicProfilesITCase extends MultipleProgramsTestBase {
+
+       private String tripletsPath;
+
+       private String mismatchesPath;
+
+       private String topSongsResultPath;
+
+       private String communitiesResultPath;
+
+       private String expectedTopSongs;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       public MusicProfilesITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Before
+       public void before() throws Exception {
+               topSongsResultPath = tempFolder.newFile().toURI().toString();
+               communitiesResultPath = tempFolder.newFile().toURI().toString();
+
+               File tripletsFile = tempFolder.newFile();
+               Files.write(MusicProfilesData.USER_SONG_TRIPLETS, tripletsFile, 
Charsets.UTF_8);
+               tripletsPath = tripletsFile.toURI().toString();
+
+               File mismatchesFile = tempFolder.newFile();
+               Files.write(MusicProfilesData.MISMATCHES, mismatchesFile, 
Charsets.UTF_8);
+               mismatchesPath = mismatchesFile.toURI().toString();
+       }
+
+       @Test
+       public void testMusicProfilesExample() throws Exception {
+               MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, 
topSongsResultPath, "0", communitiesResultPath,
+                               MusicProfilesData.MAX_ITERATIONS + ""});
+               expectedTopSongs = MusicProfilesData.TOP_SONGS_RESULT;
+       }
+
+       @After
+       public void after() throws Exception {
+               TestBaseUtils.compareResultsByLinesInMemory(expectedTopSongs, 
topSongsResultPath);
+
+               ArrayList<String> list = new ArrayList<>();
+               TestBaseUtils.readAllResultLines(list, communitiesResultPath, 
new String[]{}, false);
+
+               String[] result = list.toArray(new String[list.size()]);
+               Arrays.sort(result);
+
+               // check that user_1 and user_2 are in the same community
+               Assert.assertEquals("users 1 and 2 are not in the same 
community",
+                               result[0].substring(7), result[1].substring(7));
+
+               // check that user_3, user_4 and user_5 are in the same 
community
+               Assert.assertEquals("users 3 and 4 are not in the same 
community",
+                               result[2].substring(7), result[3].substring(7));
+               Assert.assertEquals("users 4 and 5 are not in the same 
community",
+                               result[3].substring(7), result[4].substring(7));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
new file mode 100644
index 0000000..faf92c0
--- /dev/null
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.examples;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+import org.apache.flink.graph.examples.GSASingleSourceShortestPaths;
+import org.apache.flink.graph.examples.SingleSourceShortestPaths;
+import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class SingleSourceShortestPathsITCase extends MultipleProgramsTestBase {
+
+    private String edgesPath;
+
+    private String resultPath;
+
+    private String expected;
+
+    @Rule
+    public TemporaryFolder tempFolder = new TemporaryFolder();
+
+    public SingleSourceShortestPathsITCase(TestExecutionMode mode) {
+        super(mode);
+    }
+
+    @Before
+    public void before() throws Exception {
+        resultPath = tempFolder.newFile().toURI().toString();
+
+        File edgesFile = tempFolder.newFile();
+        Files.write(SingleSourceShortestPathsData.EDGES, edgesFile, 
Charsets.UTF_8);
+        edgesPath = edgesFile.toURI().toString();
+    }
+
+    @Test
+    public void testSSSPExample() throws Exception {
+        SingleSourceShortestPaths.main(new 
String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
+                edgesPath, resultPath, 10 + ""});
+        expected = 
SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
+    }
+
+    @Test
+    public void testGSASSSPExample() throws Exception {
+        GSASingleSourceShortestPaths.main(new 
String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
+                edgesPath, resultPath, 10 + ""});
+        expected = 
SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
+    }
+
+    @After
+    public void after() throws Exception {
+        TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath);
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
 
b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
deleted file mode 100644
index 75b793e..0000000
--- 
a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.example
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.Edge
-import org.apache.flink.types.NullValue
-import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.graph.library.GSAConnectedComponents
-import java.lang.Long
-
-/**
- * This example shows how to use Gelly's library methods.
- * You can find all available library methods in 
[[org.apache.flink.graph.library]]. 
- * 
- * In particular, this example uses the
- * [[org.apache.flink.graph.library.GSAConnectedComponents]]
- * library method to compute the connected components of the input graph.
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\n1\t3\n</code> defines two edges,
- * 1-2 and 1-3.
- *
- * Usage {{
- *   ConnectedComponents <edge path> <result path> <number of iterations>
- *   }}
- * If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData]]
- */
-object ConnectedComponents {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env)
-    val graph = Graph.fromDataSet[Long, Long, NullValue](edges, new 
InitVertices, env)
-
-    val components = graph.run(new GSAConnectedComponents[Long, 
NullValue](maxIterations))
-
-
-    // emit result
-    if (fileOutput) {
-      components.writeAsCsv(outputPath, "\n", ",")
-      env.execute("Connected Components Example")
-    } else {
-      components.print()
-    }
-  }
-
-  private final class InitVertices extends MapFunction[Long, Long] {
-    override def map(id: Long) = id
-  }
-
-  // ***********************************************************************
-  // UTIL METHODS
-  // ***********************************************************************
-
-    private var fileOutput = false
-    private var edgesInputPath: String = null
-    private var outputPath: String = null
-    private var maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS
-
-    private def parseParameters(args: Array[String]): Boolean = {
-      if(args.length > 0) {
-        if(args.length != 3) {
-          System.err.println("Usage ConnectedComponents <edge path> <output 
path> " +
-            "<num iterations>")
-        }
-        fileOutput = true
-        edgesInputPath = args(0)
-        outputPath = args(1)
-        maxIterations = 2
-      } else {
-        System.out.println("Executing ConnectedComponents example with default 
parameters" +
-          " and built-in default data.")
-        System.out.println("  Provide parameters to read input data from 
files.")
-        System.out.println("  See the documentation for the correct format of 
input files.")
-        System.out.println("Usage ConnectedComponents <edge path> <output 
path> " +
-          "<num iterations>")
-      }
-      true
-    }
-
-    private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, 
NullValue]] = {
-      if (fileOutput) {
-        env.readCsvFile[(Long, Long)](edgesInputPath,
-          lineDelimiter = "\n",
-          fieldDelimiter = "\t")
-          .map(edge => new Edge[Long, NullValue](edge._1, edge._2, 
NullValue.getInstance))
-      } else {
-        val edgeData = ConnectedComponentsDefaultData.DEFAULT_EDGES map {
-          case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
-        }
-        env.fromCollection(edgeData).map(
-        edge => new Edge[Long, NullValue](edge._1, edge._2, 
NullValue.getInstance))
-      }
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
 
b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
deleted file mode 100644
index 68435ba..0000000
--- 
a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.example
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.Edge
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
-import org.apache.flink.graph.gsa.{ApplyFunction, GatherFunction, Neighbor, 
SumFunction}
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
-
-/**
- * This example shows how to use Gelly's gather-sum-apply iterations.
- * 
- * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId, distance which 
are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
- * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
- *
- * If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
- */
-object GSASingleSourceShortestPaths {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
-    val graph = Graph.fromDataSet[Long, Double, Double](edges, new 
InitVertices(srcVertexId), env)
-
-    // Execute the gather-sum-apply iteration
-    val result = graph.runGatherSumApplyIteration(new CalculateDistances, new 
ChooseMinDistance,
-      new UpdateDistance, maxIterations)
-
-    // Extract the vertices as the result
-    val singleSourceShortestPaths = result.getVertices
-
-    // emit result
-    if (fileOutput) {
-      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
-      env.execute("GSA Single Source Shortest Paths Example")
-    } else {
-      singleSourceShortestPaths.print()
-    }
-  }
-
-  // 
--------------------------------------------------------------------------------------------
-  //  Single Source Shortest Path UDFs
-  // 
--------------------------------------------------------------------------------------------
-
-  private final class InitVertices(srcId: Long) extends MapFunction[Long, 
Double] {
-
-    override def map(id: Long) = {
-      if (id.equals(srcId)) {
-        0.0
-      } else {
-        Double.PositiveInfinity
-      }
-    }
-  }
-
-  private final class CalculateDistances extends GatherFunction[Double, 
Double, Double] {
-    override def gather(neighbor: Neighbor[Double, Double]) = {
-      neighbor.getNeighborValue + neighbor.getEdgeValue
-    }
-  }
-
-  private final class ChooseMinDistance extends SumFunction[Double, Double, 
Double] {
-    override def sum(newValue: Double, currentValue: Double) = {
-      Math.min(newValue, currentValue)
-    }
-  }
-
-  private final class UpdateDistance extends ApplyFunction[Long, Double, 
Double] {
-    override def apply(newDistance: Double, oldDistance: Double) = {
-      if (newDistance < oldDistance) {
-        setResult(newDistance)
-      }
-    }
-  }
-
-  // **************************************************************************
-  // UTIL METHODS
-  // **************************************************************************
-
-  private var fileOutput = false
-  private var srcVertexId = 1L
-  private var edgesInputPath: String = null
-  private var outputPath: String = null
-  private var maxIterations = 5
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if(args.length > 0) {
-      if(args.length != 4) {
-        System.err.println("Usage: SingleSourceShortestPaths <source vertex 
id>" +
-          " <input edges path> <output path> <num iterations>")
-      }
-      fileOutput = true
-      srcVertexId = args(0).toLong
-      edgesInputPath = args(1)
-      outputPath = args(2)
-      maxIterations = 3
-    } else {
-      System.out.println("Executing Single Source Shortest Paths example "
-        + "with default parameters and built-in default data.")
-      System.out.println("  Provide parameters to read input data from files.")
-      System.out.println("  See the documentation for the correct format of 
input files.")
-      System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" 
+
-        " <input edges path> <output path> <num iterations>")
-    }
-    true
-  }
-
-  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, 
Double]] = {
-    if (fileOutput) {
-      env.readCsvFile[(Long, Long, Double)](edgesInputPath,
-        lineDelimiter = "\n",
-        fieldDelimiter = "\t")
-        .map(new Tuple3ToEdgeMap[Long, Double]())
-    } else {
-      val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
-        case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
-          z.asInstanceOf[Double])
-      }
-      env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
 
b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
deleted file mode 100644
index 1c3fcdd..0000000
--- 
a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.graph.scala.example
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.types.NullValue
-import org.apache.flink.graph.Edge
-import org.apache.flink.util.Collector
-
-/**
- * This example illustrates how to use Gelly metrics methods and get simple 
statistics
- * from the input graph.  
- * 
- * The program creates a random graph and computes and prints
- * the following metrics:
- * - number of vertices
- * - number of edges
- * - average node degree
- * - the vertex ids with the max/min in- and out-degrees
- *
- * The input file is expected to contain one edge per line,
- * with long IDs and no values, in the following format:
- * {{{
- *   <sourceVertexID>\t<targetVertexID>
- * }}}
- * If no arguments are provided, the example runs with a random graph of 100 
vertices.
- *
- */
-object GraphMetrics {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    /** create the graph **/
-    val graph: Graph[Long, NullValue, NullValue] = 
Graph.fromDataSet(getEdgeDataSet(env), env)
-
-    /** get the number of vertices **/
-    val numVertices = graph.numberOfVertices
-
-    /** get the number of edges **/
-    val numEdges = graph.numberOfEdges
-
-    /** compute the average node degree **/
-    val verticesWithDegrees = graph.getDegrees
-    val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / 
numVertices).toDouble)
-
-    /** find the vertex with the maximum in-degree **/
-    val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1)
-
-    /** find the vertex with the minimum in-degree **/
-    val minInDegreeVertex = graph.inDegrees.min(1).map(in => in._1)
-
-    /** find the vertex with the maximum out-degree **/
-    val maxOutDegreeVertex = graph.outDegrees.max(1).map(in => in._1)
-
-    /** find the vertex with the minimum out-degree **/
-    val minOutDegreeVertex = graph.outDegrees.min(1).map(in => in._1)
-
-    /** print the results **/
-    env.fromElements(numVertices).printOnTaskManager("Total number of 
vertices")
-    env.fromElements(numEdges).printOnTaskManager("Total number of edges")
-    avgDegree.printOnTaskManager("Average node degree")
-    maxInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
-    minInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
-    maxOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
-    minOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
-
-  }
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      fileOutput = true
-      if (args.length == 1) {
-        edgesPath = args(0)
-        true
-      } else {
-        System.err.println("Usage: GraphMetrics <edges path>")
-        false
-      }
-    } else {
-      System.out.println("Executing GraphMetrics example with built-in default 
data.")
-      System.out.println("  Provide parameters to read input data from a 
file.")
-      System.out.println("  Usage: GraphMetrics <edges path>")
-      true
-    }
-  }
-
-  private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, 
NullValue]] = {
-    if (fileOutput) {
-      env.readCsvFile[(Long, Long)](
-        edgesPath,
-        fieldDelimiter = "\t").map(
-        in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance()))
-    } else {
-      env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]](
-        (key: Long, out: Collector[Edge[Long, NullValue]]) => {
-          val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
-          for ( i <- 0 to numOutEdges ) {
-            val target: Long = ((Math.random() * numVertices) + 1).toLong
-            new Edge[Long, NullValue](key, target, NullValue.getInstance())
-          }
-      })
-    }
-  }
-
-  private var fileOutput: Boolean = false
-  private var edgesPath: String = null
-  private var outputPath: String = null
-  private val numVertices = 100
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
 
b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
deleted file mode 100644
index 827f1a3..0000000
--- 
a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.example
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.Edge
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.graph.spargel.VertexUpdateFunction
-import org.apache.flink.graph.spargel.MessageIterator
-import org.apache.flink.graph.Vertex
-import org.apache.flink.graph.spargel.MessagingFunction
-import scala.collection.JavaConversions._
-import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
-
-/**
- * This example shows how to use Gelly's scatter-gather iterations.
- * 
- * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId, distance which 
are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
- * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
- *
- * If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
- */
-object SingleSourceShortestPaths {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
-    val graph = Graph.fromDataSet[Long, Double, Double](edges, new 
InitVertices(srcVertexId), env)
-
-    // Execute the scatter-gather iteration
-    val result = graph.runScatterGatherIteration(new VertexDistanceUpdater,
-      new MinDistanceMessenger, maxIterations)
-
-    // Extract the vertices as the result
-    val singleSourceShortestPaths = result.getVertices
-
-    // emit result
-    if (fileOutput) {
-      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
-      env.execute("Single Source Shortest Paths Example")
-    } else {
-      singleSourceShortestPaths.print()
-    }
-  }
-
-  // 
--------------------------------------------------------------------------------------------
-  //  Single Source Shortest Path UDFs
-  // 
--------------------------------------------------------------------------------------------
-
-  private final class InitVertices(srcId: Long) extends MapFunction[Long, 
Double] {
-
-    override def map(id: Long) = {
-      if (id.equals(srcId)) {
-        0.0
-      } else {
-        Double.PositiveInfinity
-      }
-    }
-  }
-
-  /**
-   * Function that updates the value of a vertex by picking the minimum
-   * distance from all incoming messages.
-   */
-  private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, 
Double, Double] {
-
-    override def updateVertex(vertex: Vertex[Long, Double], inMessages: 
MessageIterator[Double]) {
-      var minDistance = Double.MaxValue
-      while (inMessages.hasNext) {
-        val msg = inMessages.next
-        if (msg < minDistance) {
-          minDistance = msg
-        }
-      }
-      if (vertex.getValue > minDistance) {
-        setNewVertexValue(minDistance)
-      }
-    }
-  }
-
-  /**
-   * Distributes the minimum distance associated with a given vertex among all
-   * the target vertices summed up with the edge's value.
-   */
-  private final class MinDistanceMessenger extends
-    MessagingFunction[Long, Double, Double, Double] {
-
-    override def sendMessages(vertex: Vertex[Long, Double]) {
-      if (vertex.getValue < Double.PositiveInfinity) {
-        for (edge: Edge[Long, Double] <- getEdges) {
-          sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
-        }
-      }
-    }
-  }
-
-  // 
****************************************************************************
-  // UTIL METHODS
-  // 
****************************************************************************
-
-  private var fileOutput = false
-  private var srcVertexId = 1L
-  private var edgesInputPath: String = null
-  private var outputPath: String = null
-  private var maxIterations = 5
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if(args.length > 0) {
-      if(args.length != 4) {
-        System.err.println("Usage: SingleSourceShortestPaths <source vertex 
id>" +
-          " <input edges path> <output path> <num iterations>")
-      }
-      fileOutput = true
-      srcVertexId = args(0).toLong
-      edgesInputPath = args(1)
-      outputPath = args(2)
-      maxIterations = 3
-    } else {
-      System.out.println("Executing Single Source Shortest Paths example "
-        + "with default parameters and built-in default data.")
-      System.out.println("  Provide parameters to read input data from files.")
-      System.out.println("  See the documentation for the correct format of 
input files.")
-      System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" 
+
-        " <input edges path> <output path> <num iterations>")
-    }
-    true
-  }
-
-  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, 
Double]] = {
-    if (fileOutput) {
-      env.readCsvFile[(Long, Long, Double)](edgesInputPath,
-        lineDelimiter = "\n",
-        fieldDelimiter = "\t")
-        .map(new Tuple3ToEdgeMap[Long, Double]())
-    } else {
-      val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
-        case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
-          z.asInstanceOf[Double])
-      }
-      env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
deleted file mode 100644
index cd52e04..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
-import org.apache.flink.graph.library.GSAConnectedComponents;
-import org.apache.flink.types.NullValue;
-
-/**
- * This example shows how to use Gelly's library methods.
- * You can find all available library methods in {@link 
org.apache.flink.graph.library}. 
- * 
- * In particular, this example uses the {@link 
org.apache.flink.graph.library.GSAConnectedComponents}
- * library method to compute the connected components of the input graph.
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\n1\t3\n</code> defines two edges,
- * 1-2 with and 1-3.
- *
- * Usage <code>ConnectedComponents &lt;edge path&gt; &lt;result path&gt;
- * &lt;number of iterations&gt; </code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData}
- */
-public class ConnectedComponents implements ProgramDescription {
-
-       @SuppressWarnings("serial")
-       public static void main(String [] args) throws Exception {
-
-               if(!parseParameters(args)) {
-                       return;
-               }
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
-
-               Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, 
new MapFunction<Long, Long>() {
-                       @Override
-                       public Long map(Long value) throws Exception {
-                               return value;
-                       }
-               }, env);
-
-               DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
-                               .run(new GSAConnectedComponents<Long, 
NullValue>(maxIterations));
-
-               // emit result
-               if (fileOutput) {
-                       verticesWithMinIds.writeAsCsv(outputPath, "\n", ",");
-
-                       // since file sinks are lazy, we trigger the execution 
explicitly
-                       env.execute("Connected Components Example");
-               } else {
-                       verticesWithMinIds.print();
-               }
-       }
-
-       @Override
-       public String getDescription() {
-               return "Connected Components Example";
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String edgeInputPath = null;
-       private static String outputPath = null;
-       private static Integer maxIterations = 
ConnectedComponentsDefaultData.MAX_ITERATIONS;
-
-       private static boolean parseParameters(String [] args) {
-               if(args.length > 0) {
-                       if(args.length != 3) {
-                               System.err.println("Usage ConnectedComponents 
<edge path> <output path> " +
-                                               "<num iterations>");
-                               return false;
-                       }
-
-                       fileOutput = true;
-                       edgeInputPath = args[0];
-                       outputPath = args[1];
-                       maxIterations = Integer.parseInt(args[2]);
-
-               } else {
-                       System.out.println("Executing ConnectedComponents 
example with default parameters and built-in default data.");
-                       System.out.println("Provide parameters to read input 
data from files.");
-                       System.out.println("Usage ConnectedComponents <edge 
path> <output path> " +
-                                       "<num iterations>");
-               }
-
-               return true;
-       }
-
-       @SuppressWarnings("serial")
-       private static DataSet<Edge<Long, NullValue>> 
getEdgesDataSet(ExecutionEnvironment env) {
-
-               if(fileOutput) {
-                       return env.readCsvFile(edgeInputPath)
-                                       .ignoreComments("#")
-                                       .fieldDelimiter("\t")
-                                       .lineDelimiter("\n")
-                                       .types(Long.class, Long.class)
-                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, NullValue>>() {
-                                               @Override
-                                               public Edge<Long, NullValue> 
map(Tuple2<Long, Long> value) throws Exception {
-                                                       return new Edge<Long, 
NullValue>(value.f0, value.f1, NullValue.getInstance());
-                                               }
-                                       });
-               } else {
-                       return 
ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env);
-               }
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
deleted file mode 100644
index 712be3e..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeJoinFunction;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Triplet;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.EuclideanGraphData;
-
-import java.io.Serializable;
-
-/**
- * This example shows how to use Gelly's {@link Graph#getTriplets()} and
- * {@link Graph#joinWithEdges(DataSet, EdgeJoinFunction)} methods.
- * 
- * Given a directed, unweighted graph, with vertex values representing points 
in a plan,
- * return a weighted graph where the edge weights are equal to the Euclidean 
distance between the
- * src and the trg vertex values.
- *
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <ul>
- *     <li> Vertices are represented by their vertexIds and vertex values and 
are separated by newlines,
- *     the value being formed of two doubles separated by a comma.
- *     For example: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> defines a 
data set of three vertices
- *     <li> Edges are represented by pairs of srcVertexId, trgVertexId 
separated by commas.
- *     Edges themselves are separated by newlines.
- *     For example: <code>1,2\n1,3\n</code> defines two edges 1-2 and 1-3.
- * </ul>
- *
- * Usage <code>EuclideanGraphWeighing &lt;vertex path&gt; &lt;edge path&gt; 
&lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.EuclideanGraphData}
- */
-@SuppressWarnings("serial")
-public class EuclideanGraphWeighing implements ProgramDescription {
-
-       public static void main(String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env);
-
-               DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
-
-               Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, 
edges, env);
-
-               // the edge value will be the Euclidean distance between its 
src and trg vertex
-               DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = 
graph.getTriplets()
-                               .map(new MapFunction<Triplet<Long, Point, 
Double>, Tuple3<Long, Long, Double>>() {
-
-                                       @Override
-                                       public Tuple3<Long, Long, Double> 
map(Triplet<Long, Point, Double> triplet)
-                                                       throws Exception {
-
-                                               Vertex<Long, Point> srcVertex = 
triplet.getSrcVertex();
-                                               Vertex<Long, Point> trgVertex = 
triplet.getTrgVertex();
-
-                                               return new Tuple3<Long, Long, 
Double>(srcVertex.getId(), trgVertex.getId(),
-                                                               
srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
-                                       }
-                               });
-
-               Graph<Long, Point, Double> resultedGraph = 
graph.joinWithEdges(edgesWithEuclideanWeight,
-                               new EdgeJoinFunction<Double, Double>() {
-
-                                       public Double edgeJoin(Double 
edgeValue, Double inputValue) {
-                                               return inputValue;
-                                       }
-                               });
-
-               // retrieve the edges from the final result
-               DataSet<Edge<Long, Double>> result = resultedGraph.getEdges();
-
-               // emit result
-               if (fileOutput) {
-                       result.writeAsCsv(outputPath, "\n", ",");
-
-                       // since file sinks are lazy, we trigger the execution 
explicitly
-                       env.execute("Euclidean Graph Weighing Example");
-               } else {
-                       result.print();
-               }
-
-       }
-
-       @Override
-       public String getDescription() {
-               return "Weighing a graph by computing the Euclidean distance " +
-                               "between its vertices";
-       }
-
-       // 
*************************************************************************
-       //     DATA TYPES
-       // 
*************************************************************************
-
-       /**
-        * A simple two-dimensional point.
-        */
-       public static class Point implements Serializable {
-
-               public double x, y;
-
-               public Point() {}
-
-               public Point(double x, double y) {
-                       this.x = x;
-                       this.y = y;
-               }
-
-               public double euclideanDistance(Point other) {
-                       return Math.sqrt((x-other.x)*(x-other.x) + 
(y-other.y)*(y-other.y));
-               }
-
-               @Override
-               public String toString() {
-                       return x + " " + y;
-               }
-       }
-
-       // 
******************************************************************************************************************
-       // UTIL METHODS
-       // 
******************************************************************************************************************
-
-       private static boolean fileOutput = false;
-
-       private static String verticesInputPath = null;
-
-       private static String edgesInputPath = null;
-
-       private static String outputPath = null;
-
-       private static boolean parseParameters(String[] args) {
-
-               if (args.length > 0) {
-                       if (args.length == 3) {
-                               fileOutput = true;
-                               verticesInputPath = args[0];
-                               edgesInputPath = args[1];
-                               outputPath = args[2];
-                       } else {
-                               System.out.println("Executing Euclidean Graph 
Weighing example with default parameters and built-in default data.");
-                               System.out.println("Provide parameters to read 
input data from files.");
-                               System.out.println("See the documentation for 
the correct format of input files.");
-                               System.err.println("Usage: 
EuclideanGraphWeighing <input vertices path> <input edges path>" +
-                                               " <output path>");
-                               return false;
-                       }
-               }
-               return true;
-       }
-
-       private static DataSet<Vertex<Long, Point>> 
getVerticesDataSet(ExecutionEnvironment env) {
-               if (fileOutput) {
-                       return env.readCsvFile(verticesInputPath)
-                                       .lineDelimiter("\n")
-                                       .types(Long.class, Double.class, 
Double.class)
-                                       .map(new MapFunction<Tuple3<Long, 
Double, Double>, Vertex<Long, Point>>() {
-
-                                               @Override
-                                               public Vertex<Long, Point> 
map(Tuple3<Long, Double, Double> value) throws Exception {
-                                                       return new Vertex<Long, 
Point>(value.f0, new Point(value.f1, value.f2));
-                                               }
-                                       });
-               } else {
-                       return EuclideanGraphData.getDefaultVertexDataSet(env);
-               }
-       }
-
-       private static DataSet<Edge<Long, Double>> 
getEdgesDataSet(ExecutionEnvironment env) {
-               if (fileOutput) {
-                       return env.readCsvFile(edgesInputPath)
-                                       .lineDelimiter("\n")
-                                       .types(Long.class, Long.class)
-                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, Double>>() {
-
-                                               @Override
-                                               public Edge<Long, Double> 
map(Tuple2<Long, Long> tuple2) throws Exception {
-                                                       return new Edge<Long, 
Double>(tuple2.f0, tuple2.f1, 0.0);
-                                               }
-                                       });
-               } else {
-                       return EuclideanGraphData.getDefaultEdgeDataSet(env);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
deleted file mode 100755
index 635a099..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
-
-/**
- * This example shows how to use Gelly's Gather-Sum-Apply iterations.
- * 
- * It is an implementation of the Single-Source-Shortest-Paths algorithm.
- * For a vertex-centric implementation of the same algorithm, please refer to 
{@link SingleSourceShortestPaths}. 
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId, distance which 
are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
- * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
- *
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.SingleSourceShortestPathsData}
- */
-public class GSASingleSourceShortestPaths implements ProgramDescription {
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Program
-       // 
--------------------------------------------------------------------------------------------
-
-       public static void main(String[] args) throws Exception {
-
-               if(!parseParameters(args)) {
-                       return;
-               }
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
-
-               Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, 
new InitVertices(srcVertexId), env);
-
-               // Execute the GSA iteration
-               Graph<Long, Double, Double> result = 
graph.runGatherSumApplyIteration(
-                               new CalculateDistances(), new 
ChooseMinDistance(), new UpdateDistance(), maxIterations);
-
-               // Extract the vertices as the result
-               DataSet<Vertex<Long, Double>> singleSourceShortestPaths = 
result.getVertices();
-
-               // emit result
-               if(fileOutput) {
-                       singleSourceShortestPaths.writeAsCsv(outputPath, "\n", 
",");
-
-                       // since file sinks are lazy, we trigger the execution 
explicitly
-                       env.execute("GSA Single Source Shortest Paths");
-               } else {
-                       singleSourceShortestPaths.print();
-               }
-
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Single Source Shortest Path UDFs
-       // 
--------------------------------------------------------------------------------------------
-
-       @SuppressWarnings("serial")
-       private static final class InitVertices implements MapFunction<Long, 
Double>{
-
-               private long srcId;
-
-               public InitVertices(long srcId) {
-                       this.srcId = srcId;
-               }
-
-               public Double map(Long id) {
-                       if (id.equals(srcId)) {
-                               return 0.0;
-                       }
-                       else {
-                               return Double.POSITIVE_INFINITY;
-                       }
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class CalculateDistances extends 
GatherFunction<Double, Double, Double> {
-
-               public Double gather(Neighbor<Double, Double> neighbor) {
-                       return neighbor.getNeighborValue() + 
neighbor.getEdgeValue();
-               }
-       };
-
-       @SuppressWarnings("serial")
-       private static final class ChooseMinDistance extends 
SumFunction<Double, Double, Double> {
-
-               public Double sum(Double newValue, Double currentValue) {
-                       return Math.min(newValue, currentValue);
-               }
-       };
-
-       @SuppressWarnings("serial")
-       private static final class UpdateDistance extends ApplyFunction<Long, 
Double, Double> {
-
-               public void apply(Double newDistance, Double oldDistance) {
-                       if (newDistance < oldDistance) {
-                               setResult(newDistance);
-                       }
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Util methods
-       // 
--------------------------------------------------------------------------------------------
-
-       private static boolean fileOutput = false;
-
-       private static Long srcVertexId = 1l;
-
-       private static String edgesInputPath = null;
-
-       private static String outputPath = null;
-
-       private static int maxIterations = 5;
-
-       private static boolean parseParameters(String[] args) {
-
-               if (args.length > 0) {
-                       if(args.length != 4) {
-                               System.err.println("Usage: 
GSASingleSourceShortestPaths <source vertex id>" +
-                                               " <input edges path> <output 
path> <num iterations>");
-                               return false;
-                       }
-
-                       fileOutput = true;
-                       srcVertexId = Long.parseLong(args[0]);
-                       edgesInputPath = args[1];
-                       outputPath = args[2];
-                       maxIterations = Integer.parseInt(args[3]);
-               } else {
-                               System.out.println("Executing GSASingle Source 
Shortest Paths example "
-                                               + "with default parameters and 
built-in default data.");
-                               System.out.println("  Provide parameters to 
read input data from files.");
-                               System.out.println("  See the documentation for 
the correct format of input files.");
-                               System.out.println("Usage: 
GSASingleSourceShortestPaths <source vertex id>" +
-                                               " <input edges path> <output 
path> <num iterations>");
-               }
-               return true;
-       }
-
-       private static DataSet<Edge<Long, Double>> 
getEdgeDataSet(ExecutionEnvironment env) {
-               if (fileOutput) {
-                       return env.readCsvFile(edgesInputPath)
-                                       .fieldDelimiter("\t")
-                                       .lineDelimiter("\n")
-                                       .types(Long.class, Long.class, 
Double.class)
-                                       .map(new Tuple3ToEdgeMap<Long, 
Double>());
-               } else {
-                       return 
SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
-               }
-       }
-
-       @Override
-       public String getDescription() {
-               return "GSA Single Source Shortest Paths";
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
deleted file mode 100644
index 117f7d1..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.example.utils.ExampleUtils;
-import org.apache.flink.types.NullValue;
-
-/**
- * This example illustrates how to use Gelly metrics methods and get simple 
statistics
- * from the input graph.  
- * 
- * The program creates a random graph and computes and prints
- * the following metrics:
- * - number of vertices
- * - number of edges
- * - average node degree
- * - the vertex ids with the max/min in- and out-degrees
- *
- * The input file is expected to contain one edge per line,
- * with long IDs and no values, in the following format:
- * "&lt;sourceVertexID&gt;\t&lt;targetVertexID&gt;".
- * If no arguments are provided, the example runs with a random graph of 100 
vertices.
- *
- */
-public class GraphMetrics implements ProgramDescription {
-
-       public static void main(String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               /** create the graph **/
-               Graph<Long, NullValue, NullValue> graph = 
Graph.fromDataSet(getEdgesDataSet(env), env);
-               
-               /** get the number of vertices **/
-               long numVertices = graph.numberOfVertices();
-               
-               /** get the number of edges **/
-               long numEdges = graph.numberOfEdges();
-               
-               /** compute the average node degree **/
-               DataSet<Tuple2<Long, Long>> verticesWithDegrees = 
graph.getDegrees();
-
-               DataSet<Double> avgNodeDegree = verticesWithDegrees
-                               .aggregate(Aggregations.SUM, 1).map(new 
AvgNodeDegreeMapper(numVertices));
-               
-               /** find the vertex with the maximum in-degree **/
-               DataSet<Long> maxInDegreeVertex = 
graph.inDegrees().maxBy(1).map(new ProjectVertexId());
-
-               /** find the vertex with the minimum in-degree **/
-               DataSet<Long> minInDegreeVertex = 
graph.inDegrees().minBy(1).map(new ProjectVertexId());
-
-               /** find the vertex with the maximum out-degree **/
-               DataSet<Long> maxOutDegreeVertex = 
graph.outDegrees().maxBy(1).map(new ProjectVertexId());
-
-               /** find the vertex with the minimum out-degree **/
-               DataSet<Long> minOutDegreeVertex = 
graph.outDegrees().minBy(1).map(new ProjectVertexId());
-               
-               /** print the results **/
-               ExampleUtils.printResult(env.fromElements(numVertices), "Total 
number of vertices");
-               ExampleUtils.printResult(env.fromElements(numEdges), "Total 
number of edges");
-               ExampleUtils.printResult(avgNodeDegree, "Average node degree");
-               ExampleUtils.printResult(maxInDegreeVertex, "Vertex with Max 
in-degree");
-               ExampleUtils.printResult(minInDegreeVertex, "Vertex with Min 
in-degree");
-               ExampleUtils.printResult(maxOutDegreeVertex, "Vertex with Max 
out-degree");
-               ExampleUtils.printResult(minOutDegreeVertex, "Vertex with Min 
out-degree");
-
-               env.execute();
-       }
-
-       @SuppressWarnings("serial")
-       private static final class AvgNodeDegreeMapper implements 
MapFunction<Tuple2<Long, Long>, Double> {
-
-               private long numberOfVertices;
-
-               public AvgNodeDegreeMapper(long numberOfVertices) {
-                       this.numberOfVertices = numberOfVertices;
-               }
-
-               public Double map(Tuple2<Long, Long> sumTuple) {
-                       return (double) (sumTuple.f1 / numberOfVertices) ;
-               }
-       }
-
-       @SuppressWarnings("serial")
-       private static final class ProjectVertexId implements 
MapFunction<Tuple2<Long,Long>, Long> {
-               public Long map(Tuple2<Long, Long> value) { return value.f0; }
-       }
-
-       @Override
-       public String getDescription() {
-               return "Graph Metrics Example";
-       }
-
-       // 
******************************************************************************************************************
-       // UTIL METHODS
-       // 
******************************************************************************************************************
-
-       private static boolean fileOutput = false;
-
-       private static String edgesInputPath = null;
-
-       static final int NUM_VERTICES = 100;
-
-       static final long SEED = 9876;
-
-       private static boolean parseParameters(String[] args) {
-
-               if(args.length > 0) {
-                       if(args.length != 1) {
-                               System.err.println("Usage: GraphMetrics <input 
edges>");
-                               return false;
-                       }
-
-                       fileOutput = true;
-                       edgesInputPath = args[0];
-               } else {
-                       System.out.println("Executing Graph Metrics example 
with default parameters and built-in default data.");
-                       System.out.println("  Provide parameters to read input 
data from files.");
-                       System.out.println("  See the documentation for the 
correct format of input files.");
-                       System.out.println("Usage: GraphMetrics <input edges>");
-               }
-               return true;
-       }
-
-       @SuppressWarnings("serial")
-       private static DataSet<Edge<Long, NullValue>> 
getEdgesDataSet(ExecutionEnvironment env) {
-               if (fileOutput) {
-                       return env.readCsvFile(edgesInputPath)
-                                       
.lineDelimiter("\n").fieldDelimiter("\t")
-                                       .types(Long.class, Long.class).map(
-                                                       new 
MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
-                                                               public 
Edge<Long, NullValue> map(Tuple2<Long, Long> value) {
-                                                                       return 
new Edge<Long, NullValue>(value.f0, value.f1, 
-                                                                               
        NullValue.getInstance());
-                                                               }
-                                       });
-               } else {
-                       return ExampleUtils.getRandomEdges(env, NUM_VERTICES);
-               }
-       }
-}
\ No newline at end of file

Reply via email to