http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
deleted file mode 100644
index 9e00760..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.IncrementalSSSPData;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-
-/**
- * This example illustrates how to 
- * <ul>
- *  <li> create a Graph directly from CSV files
- *  <li> use the scatter-gather iteration's messaging direction configuration 
option
- * </ul>
- * 
- * Incremental Single Sink Shortest Paths Example. Shortest Paths are 
incrementally updated
- * upon edge removal.
- *
- * The program takes as input the resulted graph after a SSSP computation,
- * an edge to be removed and the initial graph(i.e. before SSSP was computed).
- * In the following description, SP-graph is used as an abbreviation for
- * the graph resulted from the SSSP computation. We denote the edges that 
belong to this
- * graph by SP-edges.
- *
- * - If the removed edge does not belong to the SP-graph, no computation is 
necessary.
- * The edge is simply removed from the graph.
- * - If the removed edge is an SP-edge, then all nodes, whose shortest path 
contains the removed edge,
- * potentially require re-computation.
- * When the edge {@code <u, v>} is removed, v checks if it has another 
out-going SP-edge.
- * If yes, no further computation is required.
- * If v has no other out-going SP-edge, it invalidates its current value, by 
setting it to INF.
- * Then, it informs all its SP-in-neighbors by sending them an INVALIDATE 
message.
- * When a vertex u receives an INVALIDATE message from v, it checks whether it 
has another out-going SP-edge.
- * If not, it invalidates its current value and propagates the INVALIDATE 
message.
- * The propagation stops when a vertex with an alternative shortest path is 
reached
- * or when we reach a vertex with no SP-in-neighbors.
- *
- * Usage <code>IncrementalSSSP &lt;vertex path&gt; &lt;edge path&gt; &lt;edges 
in SSSP&gt;
- * &lt;src id edge to be removed&gt; &lt;trg id edge to be removed&gt; &lt;val 
edge to be removed&gt;
- * &lt;result path&gt; &lt;number of iterations&gt;</code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.IncrementalSSSPData}
- */
-@SuppressWarnings("serial")
-public class IncrementalSSSP implements ProgramDescription {
-
-       public static void main(String [] args) throws Exception {
-
-               if(!parseParameters(args)) {
-                       return;
-               }
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               Edge<Long, Double> edgeToBeRemoved = getEdgeToBeRemoved();
-
-               Graph<Long, Double, Double> graph = 
IncrementalSSSP.getGraph(env);
-
-               // Assumption: all minimum weight paths are kept
-               Graph<Long, Double, Double> ssspGraph = 
IncrementalSSSP.getSSSPGraph(env);
-
-               // remove the edge
-               graph.removeEdge(edgeToBeRemoved);
-
-               // configure the iteration
-               ScatterGatherConfiguration parameters = new 
ScatterGatherConfiguration();
-
-               if(isInSSSP(edgeToBeRemoved, ssspGraph.getEdges())) {
-
-                       parameters.setDirection(EdgeDirection.IN);
-                       parameters.setOptDegrees(true);
-
-                       // run the scatter-gather iteration to propagate info
-                       Graph<Long, Double, Double> result = 
ssspGraph.runScatterGatherIteration(new VertexDistanceUpdater(),
-                                       new 
InvalidateMessenger(edgeToBeRemoved), maxIterations, parameters);
-
-                       DataSet<Vertex<Long, Double>> resultedVertices = 
result.getVertices();
-
-                       // Emit results
-                       if(fileOutput) {
-                               resultedVertices.writeAsCsv(outputPath, "\n", 
",");
-                               env.execute("Incremental SSSP Example");
-                       } else {
-                               resultedVertices.print();
-                       }
-               } else {
-                       // print the vertices
-                       if(fileOutput) {
-                               graph.getVertices().writeAsCsv(outputPath, 
"\n", ",");
-                               env.execute("Incremental SSSP Example");
-                       } else {
-                               graph.getVertices().print();
-                       }
-               }
-       }
-
-       @Override
-       public String getDescription() {
-               return "Incremental Single Sink Shortest Paths Example";
-       }
-
-       // 
******************************************************************************************************************
-       // IncrementalSSSP METHODS
-       // 
******************************************************************************************************************
-
-       /**
-        * Function that verifies whether the edge to be removed is part of the 
SSSP or not.
-        * If it is, the src vertex will be invalidated.
-        *
-        * @param edgeToBeRemoved
-        * @param edgesInSSSP
-        * @return true or false
-        */
-       public static boolean isInSSSP(final Edge<Long, Double> 
edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception {
-
-               return edgesInSSSP.filter(new FilterFunction<Edge<Long, 
Double>>() {
-                       @Override
-                       public boolean filter(Edge<Long, Double> edge) throws 
Exception {
-                               return edge.equals(edgeToBeRemoved);
-                       }
-               }).count() > 0;
-       }
-
-       public static final class VertexDistanceUpdater extends 
VertexUpdateFunction<Long, Double, Double> {
-
-               @Override
-               public void updateVertex(Vertex<Long, Double> vertex, 
MessageIterator<Double> inMessages) throws Exception {
-                       if (inMessages.hasNext()) {
-                               Long outDegree = getOutDegree() - 1;
-                               // check if the vertex has another SP-Edge
-                               if (outDegree <= 0) {
-                                       // set own value to infinity
-                                       setNewVertexValue(Double.MAX_VALUE);
-                               }
-                       }
-               }
-       }
-
-       public static final class InvalidateMessenger extends 
MessagingFunction<Long, Double, Double, Double> {
-
-               private Edge<Long, Double> edgeToBeRemoved;
-
-               public InvalidateMessenger(Edge<Long, Double> edgeToBeRemoved) {
-                       this.edgeToBeRemoved = edgeToBeRemoved;
-               }
-
-               @Override
-               public void sendMessages(Vertex<Long, Double> vertex) throws 
Exception {
-
-
-                       if(getSuperstepNumber() == 1) {
-                               
if(vertex.getId().equals(edgeToBeRemoved.getSource())) {
-                                       // activate the edge target
-                                       
sendMessageTo(edgeToBeRemoved.getSource(), Double.MAX_VALUE);
-                               }
-                       }
-
-                       if(getSuperstepNumber() > 1) {
-                               // invalidate all edges
-                               for(Edge<Long, Double> edge : getEdges()) {
-                                       sendMessageTo(edge.getSource(), 
Double.MAX_VALUE);
-                               }
-                       }
-               }
-       }
-
-       // 
******************************************************************************************************************
-       // UTIL METHODS
-       // 
******************************************************************************************************************
-
-       private static boolean fileOutput = false;
-
-       private static String verticesInputPath = null;
-
-       private static String edgesInputPath = null;
-
-       private static String edgesInSSSPInputPath = null;
-
-       private static Long srcEdgeToBeRemoved = null;
-
-       private static Long trgEdgeToBeRemoved = null;
-
-       private static Double valEdgeToBeRemoved = null;
-
-       private static String outputPath = null;
-
-       private static int maxIterations = 5;
-
-       private static boolean parseParameters(String[] args) {
-               if (args.length > 0) {
-                       if (args.length == 8) {
-                               fileOutput = true;
-                               verticesInputPath = args[0];
-                               edgesInputPath = args[1];
-                               edgesInSSSPInputPath = args[2];
-                               srcEdgeToBeRemoved = Long.parseLong(args[3]);
-                               trgEdgeToBeRemoved = Long.parseLong(args[4]);
-                               valEdgeToBeRemoved = 
Double.parseDouble(args[5]);
-                               outputPath = args[6];
-                               maxIterations = Integer.parseInt(args[7]);
-                       } else {
-                               System.out.println("Executing IncrementalSSSP 
example with default parameters and built-in default data.");
-                               System.out.println("Provide parameters to read 
input data from files.");
-                               System.out.println("See the documentation for 
the correct format of input files.");
-                               System.out.println("Usage: IncrementalSSSP 
<vertex path> <edge path> <edges in SSSP> " +
-                                               "<src id edge to be removed> 
<trg id edge to be removed> <val edge to be removed> " +
-                                               "<output path> <max 
iterations>");
-
-                               return false;
-                       }
-               }
-               return true;
-       }
-
-       private static Graph<Long, Double, Double> 
getGraph(ExecutionEnvironment env) {
-               if(fileOutput) {
-                       return Graph.fromCsvReader(verticesInputPath, 
edgesInputPath, env).lineDelimiterEdges("\n")
-                                       .types(Long.class, Double.class, 
Double.class);
-               } else {
-                       return 
Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), 
IncrementalSSSPData.getDefaultEdgeDataSet(env), env);
-               }
-       }
-
-       private static Graph<Long, Double, Double> 
getSSSPGraph(ExecutionEnvironment env) {
-               if(fileOutput) {
-                       return Graph.fromCsvReader(verticesInputPath, 
edgesInSSSPInputPath, env).lineDelimiterEdges("\n")
-                                       .types(Long.class, Double.class, 
Double.class);
-               } else {
-                       return 
Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), 
IncrementalSSSPData.getDefaultEdgesInSSSP(env), env);
-               }
-       }
-
-       private static Edge<Long, Double> getEdgeToBeRemoved() {
-               if (fileOutput) {
-                       return new Edge<Long, Double>(srcEdgeToBeRemoved, 
trgEdgeToBeRemoved, valEdgeToBeRemoved);
-               } else {
-                       return IncrementalSSSPData.getDefaultEdgeToBeRemoved();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
deleted file mode 100644
index 5fb75e2..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.ReduceNeighborsFunction;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.Triplet;
-import org.apache.flink.graph.VertexJoinFunction;
-import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
-
-import java.util.HashSet;
-
-/**
- * This example shows how to use
- * <ul>
- *  <li> neighborhood methods
- *  <li> join with vertices
- *  <li> triplets
- * </ul>
- * 
- * Given a directed, unweighted graph, return a weighted graph where the edge 
values are equal
- * to the Jaccard similarity coefficient - the number of common neighbors 
divided by the the size
- * of the union of neighbor sets - for the src and target vertices.
- *
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <br>
- *     Edges are represented by pairs of srcVertexId, trgVertexId separated by 
tabs.
- *     Edges themselves are separated by newlines.
- *     For example: <code>1    2\n1    3\n</code> defines two edges 1-2 and 
1-3.
- * </p>
- *
- * Usage <code> JaccardSimilarityMeasure &lt;edge path&gt; &lt;result 
path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData}
- */
-@SuppressWarnings("serial")
-public class JaccardSimilarityMeasure implements ProgramDescription {
-
-       public static void main(String [] args) throws Exception {
-
-               if(!parseParameters(args)) {
-                       return;
-               }
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
-
-               Graph<Long, HashSet<Long>, Double> graph = 
Graph.fromDataSet(edges,
-                               new MapFunction<Long, HashSet<Long>>() {
-
-                                       @Override
-                                       public HashSet<Long> map(Long id) 
throws Exception {
-                                               HashSet<Long> neighbors = new 
HashSet<Long>();
-                                               neighbors.add(id);
-
-                                               return new 
HashSet<Long>(neighbors);
-                                       }
-                               }, env);
-
-               // create the set of neighbors
-               DataSet<Tuple2<Long, HashSet<Long>>> computedNeighbors =
-                               graph.reduceOnNeighbors(new GatherNeighbors(), 
EdgeDirection.ALL);
-
-               // join with the vertices to update the node values
-               Graph<Long, HashSet<Long>, Double> graphWithVertexValues =
-                               graph.joinWithVertices(computedNeighbors, new 
VertexJoinFunction<HashSet<Long>,
-                                               HashSet<Long>>() {
-
-                                       public HashSet<Long> 
vertexJoin(HashSet<Long> vertexValue, HashSet<Long> inputValue) {
-                                               return inputValue;
-                                       }
-                               });
-
-               // compare neighbors, compute Jaccard
-               DataSet<Edge<Long, Double>> edgesWithJaccardValues =
-                               graphWithVertexValues.getTriplets().map(new 
ComputeJaccard());
-
-               // emit result
-               if (fileOutput) {
-                       edgesWithJaccardValues.writeAsCsv(outputPath, "\n", 
",");
-
-                       // since file sinks are lazy, we trigger the execution 
explicitly
-                       env.execute("Executing Jaccard Similarity Measure");
-               } else {
-                       edgesWithJaccardValues.print();
-               }
-
-       }
-
-       @Override
-       public String getDescription() {
-               return "Vertex Jaccard Similarity Measure";
-       }
-
-       /**
-        * Each vertex will have a HashSet containing its neighbor ids as value.
-        */
-       private static final class GatherNeighbors implements 
ReduceNeighborsFunction<HashSet<Long>> {
-
-               @Override
-               public HashSet<Long> reduceNeighbors(HashSet<Long> first, 
HashSet<Long> second) {
-                       first.addAll(second);
-                       return new HashSet<Long>(first);
-               }
-       }
-
-       /**
-        * The edge weight will be the Jaccard coefficient, which is computed 
as follows:
-        *
-        * Consider the edge x-y
-        * We denote by sizeX and sizeY, the neighbors hash set size of x and y 
respectively.
-        * sizeX+sizeY = union + intersection of neighborhoods
-        * size(hashSetX.addAll(hashSetY)).distinct = union of neighborhoods
-        * The intersection can then be deduced.
-        *
-        * The Jaccard similarity coefficient is then, the intersection/union.
-        */
-       private static final class ComputeJaccard implements
-                       MapFunction<Triplet<Long, HashSet<Long>, Double>, 
Edge<Long, Double>> {
-
-               @Override
-               public Edge<Long, Double> map(Triplet<Long, HashSet<Long>, 
Double> triplet) throws Exception {
-
-                       Vertex<Long, HashSet<Long>> srcVertex = 
triplet.getSrcVertex();
-                       Vertex<Long, HashSet<Long>> trgVertex = 
triplet.getTrgVertex();
-
-                       Long x = srcVertex.getId();
-                       Long y = trgVertex.getId();
-                       HashSet<Long> neighborSetY = trgVertex.getValue();
-
-                       double unionPlusIntersection = 
srcVertex.getValue().size() + neighborSetY.size();
-                       // within a HashSet, all elements are distinct
-                       HashSet<Long> unionSet = new HashSet<Long>();
-                       unionSet.addAll(srcVertex.getValue());
-                       unionSet.addAll(neighborSetY);
-                       double union = unionSet.size();
-                       double intersection = unionPlusIntersection - union;
-
-                       return new Edge<Long, Double>(x, y, intersection/union);
-               }
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String edgeInputPath = null;
-       private static String outputPath = null;
-
-       private static boolean parseParameters(String [] args) {
-               if(args.length > 0) {
-                       if(args.length != 2) {
-                               System.err.println("Usage 
JaccardSimilarityMeasure <edge path> <output path>");
-                               return false;
-                       }
-
-                       fileOutput = true;
-                       edgeInputPath = args[0];
-                       outputPath = args[1];
-               } else {
-                       System.out.println("Executing JaccardSimilarityMeasure 
example with default parameters and built-in default data.");
-                       System.out.println("Provide parameters to read input 
data from files.");
-                       System.out.println("Usage JaccardSimilarityMeasure 
<edge path> <output path>");
-               }
-
-               return true;
-       }
-
-       private static DataSet<Edge<Long, Double>> 
getEdgesDataSet(ExecutionEnvironment env) {
-
-               if(fileOutput) {
-                       return env.readCsvFile(edgeInputPath)
-                                       .ignoreComments("#")
-                                       .fieldDelimiter("\t")
-                                       .lineDelimiter("\n")
-                                       .types(Long.class, Long.class)
-                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, Double>>() {
-                                               @Override
-                                               public Edge<Long, Double> 
map(Tuple2<Long, Long> tuple2) throws Exception {
-                                                       return new Edge<Long, 
Double>(tuple2.f0, tuple2.f1, new Double(0));
-                                               }
-                                       });
-               } else {
-                       return 
JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
deleted file mode 100644
index b2857d0..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.EdgesFunctionWithVertexValue;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.VertexJoinFunction;
-import org.apache.flink.graph.example.utils.MusicProfilesData;
-import org.apache.flink.graph.library.LabelPropagation;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-
-/**
- * This example demonstrates how to mix the DataSet Flink API with the Gelly 
API.
- * The input is a set &lt;userId - songId - playCount&gt; triplets and
- * a set of bad records, i.e. song ids that should not be trusted.
- * Initially, we use the DataSet API to filter out the bad records.
- * Then, we use Gelly to create a user -&gt; song weighted bipartite graph and 
compute
- * the top song (most listened) per user.
- * Then, we use the DataSet API again, to create a user-user similarity graph,
- * based on common songs, where users that are listeners of the same song
- * are connected. A user-defined threshold on the playcount value
- * defines when a user is considered to be a listener of a song.
- * Finally, we use the graph API to run the label propagation community 
detection algorithm on
- * the similarity graph.
- *
- * The triplets input is expected to be given as one triplet per line,
- * in the following format: 
"&lt;userID&gt;\t&lt;songID&gt;\t&lt;playcount&gt;".
- *
- * The mismatches input file is expected to contain one mismatch record per 
line,
- * in the following format:
- * "ERROR: &lt;songID trackID&gt; song_title"
- *
- * If no arguments are provided, the example runs with default data from 
{@link MusicProfilesData}.
- */
-@SuppressWarnings("serial")
-public class MusicProfiles implements ProgramDescription {
-
-       public static void main(String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               /**
-                * Read the user-song-play triplets.
-                */
-               DataSet<Tuple3<String, String, Integer>> triplets = 
getUserSongTripletsData(env);
-
-               /**
-                * Read the mismatches dataset and extract the songIDs
-                */
-               DataSet<Tuple1<String>> mismatches = 
getMismatchesData(env).map(new ExtractMismatchSongIds());
-
-               /**
-                * Filter out the mismatches from the triplets dataset
-                */
-               DataSet<Tuple3<String, String, Integer>> validTriplets = 
triplets
-                               .coGroup(mismatches).where(1).equalTo(0)
-                               .with(new FilterOutMismatches());
-
-               /**
-                * Create a user -> song weighted bipartite graph where the 
edge weights
-                * correspond to play counts
-                */
-               Graph<String, NullValue, Integer> userSongGraph = 
Graph.fromTupleDataSet(validTriplets, env);
-
-               /**
-                * Get the top track (most listened) for each user
-                */
-               DataSet<Tuple2<String, String>> usersWithTopTrack = 
userSongGraph
-                               .groupReduceOnEdges(new GetTopSongPerUser(), 
EdgeDirection.OUT)
-                               .filter(new FilterSongNodes());
-
-               if (fileOutput) {
-                       usersWithTopTrack.writeAsCsv(topTracksOutputPath, "\n", 
"\t");
-               } else {
-                       usersWithTopTrack.print();
-               }
-
-               /**
-                * Create a user-user similarity graph, based on common songs, 
i.e. two
-                * users that listen to the same song are connected. For each 
song, we
-                * create an edge between each pair of its in-neighbors.
-                */
-               DataSet<Edge<String, NullValue>> similarUsers = userSongGraph
-                               .getEdges()
-                               // filter out user-song edges that are below 
the playcount threshold
-                               .filter(new FilterFunction<Edge<String, 
Integer>>() {
-                                       public boolean filter(Edge<String, 
Integer> edge) {
-                                               return (edge.getValue() > 
playcountThreshold);
-                                       }
-                               }).groupBy(1)
-                               .reduceGroup(new 
CreateSimilarUserEdges()).distinct();
-
-               Graph<String, Long, NullValue> similarUsersGraph = 
Graph.fromDataSet(similarUsers,
-                               new MapFunction<String, Long>() {
-                                       public Long map(String value) {
-                                               return 1l;
-                                       }
-                               }, env).getUndirected();
-
-               /**
-                * Detect user communities using the label propagation library 
method
-                */
-               // Initialize each vertex with a unique numeric label and run 
the label propagation algorithm
-               DataSet<Tuple2<String, Long>> idsWithInitialLabels = 
DataSetUtils
-                               
.zipWithUniqueId(similarUsersGraph.getVertexIds())
-                               .map(new MapFunction<Tuple2<Long, String>, 
Tuple2<String, Long>>() {
-                                       @Override
-                                       public Tuple2<String, Long> 
map(Tuple2<Long, String> tuple2) throws Exception {
-                                               return new Tuple2<String, 
Long>(tuple2.f1, tuple2.f0);
-                                       }
-                               });
-
-               DataSet<Vertex<String, Long>> verticesWithCommunity = 
similarUsersGraph
-                               .joinWithVertices(idsWithInitialLabels,
-                                               new VertexJoinFunction<Long, 
Long>() {
-                                                       public Long 
vertexJoin(Long vertexValue, Long inputValue) {
-                                                               return 
inputValue;
-                                                       }
-                                               }).run(new 
LabelPropagation<String, Long, NullValue>(maxIterations));
-
-               if (fileOutput) {
-                       verticesWithCommunity.writeAsCsv(communitiesOutputPath, 
"\n", "\t");
-
-                       // since file sinks are lazy, we trigger the execution 
explicitly
-                       env.execute();
-               } else {
-                       verticesWithCommunity.print();
-               }
-
-       }
-
-       public static final class ExtractMismatchSongIds implements 
MapFunction<String, Tuple1<String>> {
-
-               public Tuple1<String> map(String value) {
-                       String[] tokens = value.split("\\s+");
-                       String songId = tokens[1].substring(1);
-                       return new Tuple1<String>(songId);
-               }
-       }
-
-       public static final class FilterOutMismatches implements 
CoGroupFunction<Tuple3<String, String, Integer>,
-               Tuple1<String>, Tuple3<String, String, Integer>> {
-
-               public void coGroup(Iterable<Tuple3<String, String, Integer>> 
triplets,
-                               Iterable<Tuple1<String>> invalidSongs, 
Collector<Tuple3<String, String, Integer>> out) {
-
-                       if (!invalidSongs.iterator().hasNext()) {
-                               // this is a valid triplet
-                               for (Tuple3<String, String, Integer> triplet : 
triplets) {
-                                       out.collect(triplet);
-                               }
-                       }
-               }
-       }
-
-       public static final class FilterSongNodes implements 
FilterFunction<Tuple2<String, String>> {
-               public boolean filter(Tuple2<String, String> value) throws 
Exception {
-                       return !value.f1.equals("");
-               }
-       }
-
-       public static final class GetTopSongPerUser     implements 
EdgesFunctionWithVertexValue<String, NullValue, Integer,
-               Tuple2<String, String>> {
-
-               public void iterateEdges(Vertex<String, NullValue> vertex,
-                               Iterable<Edge<String, Integer>> edges, 
Collector<Tuple2<String, String>> out) throws Exception {
-
-                       int maxPlaycount = 0;
-                       String topSong = "";
-                       for (Edge<String, Integer> edge : edges) {
-                               if (edge.getValue() > maxPlaycount) {
-                                       maxPlaycount = edge.getValue();
-                                       topSong = edge.getTarget();
-                               }
-                       }
-                       out.collect(new Tuple2<String, String>(vertex.getId(), 
topSong));
-               }
-       }
-
-       public static final class CreateSimilarUserEdges implements 
GroupReduceFunction<Edge<String, Integer>,
-               Edge<String, NullValue>> {
-
-               public void reduce(Iterable<Edge<String, Integer>> edges, 
Collector<Edge<String, NullValue>> out) {
-                       List<String> listeners = new ArrayList<String>();
-                       for (Edge<String, Integer> edge : edges) {
-                               listeners.add(edge.getSource());
-                       }
-                       for (int i = 0; i < listeners.size() - 1; i++) {
-                               for (int j = i + 1; j < listeners.size(); j++) {
-                                       out.collect(new Edge<String, 
NullValue>(listeners.get(i),
-                                                       listeners.get(j), 
NullValue.getInstance()));
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public String getDescription() {
-               return "Music Profiles Example";
-       }
-
-       // 
******************************************************************************************************************
-       // UTIL METHODS
-       // 
******************************************************************************************************************
-
-       private static boolean fileOutput = false;
-
-       private static String userSongTripletsInputPath = null;
-
-       private static String mismatchesInputPath = null;
-
-       private static String topTracksOutputPath = null;
-
-       private static int playcountThreshold = 0;
-
-       private static String communitiesOutputPath = null;
-
-       private static int maxIterations = 10;
-
-       private static boolean parseParameters(String[] args) {
-
-               if(args.length > 0) {
-                       if(args.length != 6) {
-                               System.err.println("Usage: MusicProfiles <input 
user song triplets path>" +
-                                               " <input song mismatches path> 
<output top tracks path> "
-                                               + "<playcount threshold> 
<output communities path> <num iterations>");
-                               return false;
-                       }
-
-                       fileOutput = true;
-                       userSongTripletsInputPath = args[0];
-                       mismatchesInputPath = args[1];
-                       topTracksOutputPath = args[2];
-                       playcountThreshold = Integer.parseInt(args[3]);
-                       communitiesOutputPath = args[4];
-                       maxIterations = Integer.parseInt(args[5]);
-               } else {
-                       System.out.println("Executing Music Profiles example 
with default parameters and built-in default data.");
-                       System.out.println("  Provide parameters to read input 
data from files.");
-                       System.out.println("  See the documentation for the 
correct format of input files.");
-                       System.out.println("Usage: MusicProfiles <input user 
song triplets path>" +
-                                       " <input song mismatches path> <output 
top tracks path> "
-                                       + "<playcount threshold> <output 
communities path> <num iterations>");
-               }
-               return true;
-       }
-
-       private static DataSet<Tuple3<String, String, Integer>> 
getUserSongTripletsData(ExecutionEnvironment env) {
-               if (fileOutput) {
-                       return env.readCsvFile(userSongTripletsInputPath)
-                                       
.lineDelimiter("\n").fieldDelimiter("\t")
-                                       .types(String.class, String.class, 
Integer.class);
-               } else {
-                       return MusicProfilesData.getUserSongTriplets(env);
-               }
-       }
-
-       private static DataSet<String> getMismatchesData(ExecutionEnvironment 
env) {
-               if (fileOutput) {
-                       return env.readTextFile(mismatchesInputPath);
-               } else {
-                       return MusicProfilesData.getMismatches(env);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
deleted file mode 100644
index ba84e80..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
-
-/**
- * This example shows how to use Gelly's scatter-gather iterations.
- * 
- * It is an implementation of the Single-Source-Shortest-Paths algorithm.
- * For a gather-sum-apply implementation of the same algorithm, please refer 
to {@link GSASingleSourceShortestPaths}. 
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId, distance which 
are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
- * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
- *
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.SingleSourceShortestPathsData}
- */
-public class SingleSourceShortestPaths implements ProgramDescription {
-
-       public static void main(String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
-
-               Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, 
new InitVertices(srcVertexId), env);
-
-               // Execute the scatter-gather iteration
-               Graph<Long, Double, Double> result = 
graph.runScatterGatherIteration(
-                               new VertexDistanceUpdater(), new 
MinDistanceMessenger(), maxIterations);
-
-               // Extract the vertices as the result
-               DataSet<Vertex<Long, Double>> singleSourceShortestPaths = 
result.getVertices();
-
-               // emit result
-               if (fileOutput) {
-                       singleSourceShortestPaths.writeAsCsv(outputPath, "\n", 
",");
-
-                       // since file sinks are lazy, we trigger the execution 
explicitly
-                       env.execute("Single Source Shortest Paths Example");
-               } else {
-                       singleSourceShortestPaths.print();
-               }
-
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Single Source Shortest Path UDFs
-       // 
--------------------------------------------------------------------------------------------
-
-       @SuppressWarnings("serial")
-       private static final class InitVertices implements MapFunction<Long, 
Double>{
-
-               private long srcId;
-
-               public InitVertices(long srcId) {
-                       this.srcId = srcId;
-               }
-
-               public Double map(Long id) {
-                       if (id.equals(srcId)) {
-                               return 0.0;
-                       }
-                       else {
-                               return Double.POSITIVE_INFINITY;
-                       }
-               }
-       }
-
-       /**
-        * Function that updates the value of a vertex by picking the minimum
-        * distance from all incoming messages.
-        */
-       @SuppressWarnings("serial")
-       public static final class VertexDistanceUpdater extends 
VertexUpdateFunction<Long, Double, Double> {
-
-               @Override
-               public void updateVertex(Vertex<Long, Double> vertex, 
MessageIterator<Double> inMessages) {
-
-                       Double minDistance = Double.MAX_VALUE;
-
-                       for (double msg : inMessages) {
-                               if (msg < minDistance) {
-                                       minDistance = msg;
-                               }
-                       }
-
-                       if (vertex.getValue() > minDistance) {
-                               setNewVertexValue(minDistance);
-                       }
-               }
-       }
-
-       /**
-        * Distributes the minimum distance associated with a given vertex 
among all
-        * the target vertices summed up with the edge's value.
-        */
-       @SuppressWarnings("serial")
-       public static final class MinDistanceMessenger extends 
MessagingFunction<Long, Double, Double, Double> {
-
-               @Override
-               public void sendMessages(Vertex<Long, Double> vertex) {
-                       if (vertex.getValue() < Double.POSITIVE_INFINITY) {
-                               for (Edge<Long, Double> edge : getEdges()) {
-                                       sendMessageTo(edge.getTarget(), 
vertex.getValue() + edge.getValue());
-                               }
-                       }
-               }
-       }
-
-       // 
******************************************************************************************************************
-       // UTIL METHODS
-       // 
******************************************************************************************************************
-
-       private static boolean fileOutput = false;
-
-       private static Long srcVertexId = 1l;
-
-       private static String edgesInputPath = null;
-
-       private static String outputPath = null;
-
-       private static int maxIterations = 5;
-
-       private static boolean parseParameters(String[] args) {
-
-               if(args.length > 0) {
-                       if(args.length != 4) {
-                               System.err.println("Usage: 
SingleSourceShortestPaths <source vertex id>" +
-                                               " <input edges path> <output 
path> <num iterations>");
-                               return false;
-                       }
-
-                       fileOutput = true;
-                       srcVertexId = Long.parseLong(args[0]);
-                       edgesInputPath = args[1];
-                       outputPath = args[2];
-                       maxIterations = Integer.parseInt(args[3]);
-               } else {
-                               System.out.println("Executing Single Source 
Shortest Paths example "
-                                               + "with default parameters and 
built-in default data.");
-                               System.out.println("  Provide parameters to 
read input data from files.");
-                               System.out.println("  See the documentation for 
the correct format of input files.");
-                               System.out.println("Usage: 
SingleSourceShortestPaths <source vertex id>" +
-                                               " <input edges path> <output 
path> <num iterations>");
-               }
-               return true;
-       }
-
-       private static DataSet<Edge<Long, Double>> 
getEdgesDataSet(ExecutionEnvironment env) {
-               if (fileOutput) {
-                       return env.readCsvFile(edgesInputPath)
-                                       .lineDelimiter("\n")
-                                       .fieldDelimiter("\t")
-                                       .types(Long.class, Long.class, 
Double.class)
-                                       .map(new Tuple3ToEdgeMap<Long, 
Double>());
-               } else {
-                       return 
SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
-               }
-       }
-
-       @Override
-       public String getDescription() {
-               return "Scatter-gather Single Source Shortest Paths";
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
deleted file mode 100644
index c37b2b5..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example.utils;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Provides the default data set used for the Simple Community Detection test 
program.
- * If no parameters are given to the program, the default edge data set is 
used.
- */
-public class CommunityDetectionData {
-
-       // the algorithm is not guaranteed to always converge
-       public static final Integer MAX_ITERATIONS = 30;
-
-       public static final double DELTA = 0.5f;
-
-       public static final String COMMUNITIES_SINGLE_ITERATION = "1,5\n" + 
"2,6\n"
-                       + "3,1\n" + "4,1\n" + "5,1\n" + "6,8\n" + "7,8\n" + 
"8,7"; 
-
-       public static final String COMMUNITIES_WITH_TIE = "1,2\n" + "2,1\n" + 
"3,1\n" + "4,1\n" + "5,1";
-
-       public static DataSet<Edge<Long, Double>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
-
-               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
-               edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
-               edges.add(new Edge<Long, Double>(1L, 3L, 2.0));
-               edges.add(new Edge<Long, Double>(1L, 4L, 3.0));
-               edges.add(new Edge<Long, Double>(2L, 3L, 4.0));
-               edges.add(new Edge<Long, Double>(2L, 4L, 5.0));
-               edges.add(new Edge<Long, Double>(3L, 5L, 6.0));
-               edges.add(new Edge<Long, Double>(5L, 6L, 7.0));
-               edges.add(new Edge<Long, Double>(5L, 7L, 8.0));
-               edges.add(new Edge<Long, Double>(6L, 7L, 9.0));
-               edges.add(new Edge<Long, Double>(7L, 12L, 10.0));
-               edges.add(new Edge<Long, Double>(8L, 9L, 11.0));
-               edges.add(new Edge<Long, Double>(8L, 10L, 12.0));
-               edges.add(new Edge<Long, Double>(8L, 11L, 13.0));
-               edges.add(new Edge<Long, Double>(9L, 10L, 14.0));
-               edges.add(new Edge<Long, Double>(9L, 11L, 15.0));
-               edges.add(new Edge<Long, Double>(10L, 11L, 16.0));
-               edges.add(new Edge<Long, Double>(10L, 12L, 17.0));
-               edges.add(new Edge<Long, Double>(11L, 12L, 18.0));
-
-               return env.fromCollection(edges);
-       }
-
-       public static DataSet<Edge<Long, Double>> 
getSimpleEdgeDataSet(ExecutionEnvironment env) {
-
-               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
-               edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
-               edges.add(new Edge<Long, Double>(1L, 3L, 2.0));
-               edges.add(new Edge<Long, Double>(1L, 4L, 3.0));
-               edges.add(new Edge<Long, Double>(1L, 5L, 4.0));
-               edges.add(new Edge<Long, Double>(2L, 6L, 5.0));
-               edges.add(new Edge<Long, Double>(6L, 7L, 6.0));
-               edges.add(new Edge<Long, Double>(6L, 8L, 7.0));
-               edges.add(new Edge<Long, Double>(7L, 8L, 8.0));
-
-               return env.fromCollection(edges);
-       }
-
-       private CommunityDetectionData() {}
-
-       public static DataSet<Edge<Long, Double>> 
getTieEdgeDataSet(ExecutionEnvironment env) {
-               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
-               edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
-               edges.add(new Edge<Long, Double>(1L, 3L, 1.0));
-               edges.add(new Edge<Long, Double>(1L, 4L, 1.0));
-               edges.add(new Edge<Long, Double>(1L, 5L, 1.0));
-
-               return env.fromCollection(edges);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
deleted file mode 100644
index 67864eb..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ConnectedComponentsDefaultData.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example.utils;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.types.NullValue;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Provides the default data sets used for the connected components example 
program.
- * If no parameters are given to the program, the default data sets are used.
- */
-public class ConnectedComponentsDefaultData {
-
-       public static final Integer MAX_ITERATIONS = 4;
-
-       public static final String EDGES = "1   2\n" + "2       3\n" + "2       
4\n" + "3       4";
-
-       public static final Object[][] DEFAULT_EDGES = new Object[][] {
-               new Object[]{1L, 2L},
-               new Object[]{2L, 3L},
-               new Object[]{2L, 4L},
-               new Object[]{3L, 4L}
-       };
-
-       public static DataSet<Edge<Long, NullValue>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
-               List<Edge<Long, NullValue>> edgeList = new 
LinkedList<Edge<Long, NullValue>>();
-               for (Object[] edge : DEFAULT_EDGES) {
-                       edgeList.add(new Edge<Long, NullValue>((Long) edge[0], 
(Long) edge[1], NullValue.getInstance()));
-               }
-               return env.fromCollection(edgeList);
-       }
-
-       public static final String VERTICES_WITH_MIN_ID = "1,1\n" + "2,1\n" + 
"3,1\n" + "4,1";
-
-       private ConnectedComponentsDefaultData() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java
deleted file mode 100644
index 80765bf..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EuclideanGraphData.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example.utils;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.EuclideanGraphWeighing;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Provides the default data sets used for the Euclidean Graph example program.
- * If no parameters are given to the program, the default data sets are used.
- */
-public class EuclideanGraphData {
-
-       public static final int NUM_VERTICES = 9;
-
-       public static final String VERTICES = "1,1.0,1.0\n" + "2,2.0,2.0\n" + 
"3,3.0,3.0\n" + "4,4.0,4.0\n" + "5,5.0,5.0\n" +
-                       "6,6.0,6.0\n" + "7,7.0,7.0\n" + "8,8.0,8.0\n" + 
"9,9.0,9.0";
-
-       public static DataSet<Vertex<Long, EuclideanGraphWeighing.Point>> 
getDefaultVertexDataSet(ExecutionEnvironment env) {
-
-               List<Vertex<Long, EuclideanGraphWeighing.Point>> vertices = new 
ArrayList<Vertex<Long, EuclideanGraphWeighing.Point>>();
-               for(int i=1; i<=NUM_VERTICES; i++) {
-                       vertices.add(new Vertex<Long, 
EuclideanGraphWeighing.Point>(new Long(i),
-                                       new EuclideanGraphWeighing.Point(new 
Double(i), new Double(i))));
-               }
-
-               return env.fromCollection(vertices);
-       }
-
-       public static final String EDGES = "1,2\n" + "1,4\n" + "2,3\n" + 
"2,4\n" + "2,5\n" +
-                       "3,5\n" + "4,5\n" + "4,6\n" + "5,7\n" + "5,9\n" + 
"6,7\n" + "6,8\n" +
-                       "7,8\n" + "7,9\n" +  "8,9";
-
-       public static DataSet<Edge<Long, Double>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
-
-               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
-               edges.add(new Edge<Long, Double>(1L, 2L, 0.0));
-               edges.add(new Edge<Long, Double>(1L, 4L, 0.0));
-               edges.add(new Edge<Long, Double>(2L, 3L, 0.0));
-               edges.add(new Edge<Long, Double>(2L, 4L, 0.0));
-               edges.add(new Edge<Long, Double>(2L, 5L, 0.0));
-               edges.add(new Edge<Long, Double>(3L, 5L, 0.0));
-               edges.add(new Edge<Long, Double>(4L, 5L, 0.0));
-               edges.add(new Edge<Long, Double>(4L, 6L, 0.0));
-               edges.add(new Edge<Long, Double>(5L, 7L, 0.0));
-               edges.add(new Edge<Long, Double>(5L, 9L, 0.0));
-               edges.add(new Edge<Long, Double>(6L, 7L, 0.0));
-               edges.add(new Edge<Long, Double>(6L, 8L, 0.0));
-               edges.add(new Edge<Long, Double>(6L, 8L, 0.0));
-               edges.add(new Edge<Long, Double>(7L, 8L, 0.0));
-               edges.add(new Edge<Long, Double>(7L, 9L, 0.0));
-               edges.add(new Edge<Long, Double>(8L, 9L, 0.0));
-
-               return env.fromCollection(edges);
-       }
-
-       public static final String RESULTED_WEIGHTED_EDGES = 
"1,2,1.4142135623730951\n" + "1,4,4.242640687119285\n" +
-                       "2,3,1.4142135623730951\n" + "2,4,2.8284271247461903\n" 
+ "2,5,4.242640687119285\n" + "3,5,2.8284271247461903\n" +
-                       "4,5,1.4142135623730951\n" + "4,6,2.8284271247461903\n" 
+ "5,7,2.8284271247461903\n" + "5,9,5.656854249492381\n" +
-                       "6,7,1.4142135623730951\n" + "6,8,2.8284271247461903\n" 
+ "7,8,1.4142135623730951\n" + "7,9,2.8284271247461903\n" +
-                       "8,9,1.4142135623730951";
-
-       private EuclideanGraphData() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
deleted file mode 100644
index 7fbee46..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example.utils;
-
-import java.io.PrintStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-
-public class ExampleUtils {
-
-       @SuppressWarnings({ "serial", "unchecked", "rawtypes" })
-       public static void printResult(DataSet set, String msg) {
-               set.output(new PrintingOutputFormatWithMessage(msg) {
-               });
-       }
-
-       public static class PrintingOutputFormatWithMessage<T> implements
-                       OutputFormat<T> {
-
-               private static final long serialVersionUID = 1L;
-
-               private transient PrintStream stream;
-
-               private transient String prefix;
-
-               private String message;
-
-               // 
--------------------------------------------------------------------------------------------
-
-               /**
-                * Instantiates a printing output format that prints to 
standard out.
-                */
-               public PrintingOutputFormatWithMessage() {
-               }
-
-               public PrintingOutputFormatWithMessage(String msg) {
-                       this.message = msg;
-               }
-
-               @Override
-               public void open(int taskNumber, int numTasks) {
-                       // get the target stream
-                       this.stream = System.out;
-
-                       // set the prefix to message
-                       this.prefix = message + ": ";
-               }
-
-               @Override
-               public void writeRecord(T record) {
-                       if (this.prefix != null) {
-                               this.stream.println(this.prefix + 
record.toString());
-                       } else {
-                               this.stream.println(record.toString());
-                       }
-               }
-
-               @Override
-               public void close() {
-                       this.stream = null;
-                       this.prefix = null;
-               }
-
-               @Override
-               public String toString() {
-                       return "Print to System.out";
-               }
-
-               @Override
-               public void configure(Configuration parameters) {
-               }
-       }
-
-       @SuppressWarnings("serial")
-       public static DataSet<Vertex<Long, NullValue>> getVertexIds(
-                       ExecutionEnvironment env, final long numVertices) {
-               return env.generateSequence(1, numVertices).map(
-                               new MapFunction<Long, Vertex<Long, 
NullValue>>() {
-                                       public Vertex<Long, NullValue> map(Long 
l) {
-                                               return new Vertex<Long, 
NullValue>(l, NullValue
-                                                               .getInstance());
-                                       }
-                               });
-       }
-
-       @SuppressWarnings("serial")
-       public static DataSet<Edge<Long, NullValue>> getRandomEdges(
-                       ExecutionEnvironment env, final long numVertices) {
-               return env.generateSequence(1, numVertices).flatMap(
-                               new FlatMapFunction<Long, Edge<Long, 
NullValue>>() {
-                                       @Override
-                                       public void flatMap(Long key, 
Collector<Edge<Long, NullValue>> out) throws Exception {
-                                               int numOutEdges = (int) 
(Math.random() * (numVertices / 2));
-                                               for (int i = 0; i < 
numOutEdges; i++) {
-                                                       long target = (long) 
(Math.random() * numVertices) + 1;
-                                                       out.collect(new 
Edge<Long, NullValue>(key, target,
-                                                                       
NullValue.getInstance()));
-                                               }
-                                       }
-                               });
-       }
-
-       public static DataSet<Vertex<Long, Double>> getLongDoubleVertexData(
-                       ExecutionEnvironment env) {
-               List<Vertex<Long, Double>> vertices = new 
ArrayList<Vertex<Long, Double>>();
-               vertices.add(new Vertex<Long, Double>(1L, 1.0));
-               vertices.add(new Vertex<Long, Double>(2L, 2.0));
-               vertices.add(new Vertex<Long, Double>(3L, 3.0));
-               vertices.add(new Vertex<Long, Double>(4L, 4.0));
-               vertices.add(new Vertex<Long, Double>(5L, 5.0));
-
-               return env.fromCollection(vertices);
-       }
-
-       public static DataSet<Edge<Long, Double>> getLongDoubleEdgeData(
-                       ExecutionEnvironment env) {
-               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
-               edges.add(new Edge<Long, Double>(1L, 2L, 12.0));
-               edges.add(new Edge<Long, Double>(1L, 3L, 13.0));
-               edges.add(new Edge<Long, Double>(2L, 3L, 23.0));
-               edges.add(new Edge<Long, Double>(3L, 4L, 34.0));
-               edges.add(new Edge<Long, Double>(3L, 5L, 35.0));
-               edges.add(new Edge<Long, Double>(4L, 5L, 45.0));
-               edges.add(new Edge<Long, Double>(5L, 1L, 51.0));
-
-               return env.fromCollection(edges);
-       }
-
-       /**
-        * Private constructor to prevent instantiation.
-        */
-       private ExampleUtils() {
-               throw new RuntimeException();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java
deleted file mode 100644
index 7b69ec0..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example.utils;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Vertex;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Provides the default data sets used for the IncrementalSSSP example program.
- * If no parameters are given to the program, the default data sets are used.
- */
-public class IncrementalSSSPData {
-
-       public static final int NUM_VERTICES = 5;
-
-       public static final String VERTICES = "1,6.0\n" + "2,2.0\n" + "3,3.0\n" 
+ "4,1.0\n" + "5,0.0";
-
-       public static DataSet<Vertex<Long, Double>> 
getDefaultVertexDataSet(ExecutionEnvironment env) {
-
-               List<Vertex<Long, Double>> vertices = new 
ArrayList<Vertex<Long, Double>>();
-               vertices.add(new Vertex<Long, Double>(1L, 6.0));
-               vertices.add(new Vertex<Long, Double>(2L, 2.0));
-               vertices.add(new Vertex<Long, Double>(3L, 3.0));
-               vertices.add(new Vertex<Long, Double>(4L, 1.0));
-               vertices.add(new Vertex<Long, Double>(5L, 0.0));
-
-               return env.fromCollection(vertices);
-       }
-
-       public static final String EDGES = "1,3,3.0\n" + "2,4,3.0\n" + 
"2,5,2.0\n" + "3,2,1.0\n" + "3,5,5.0\n" +
-                       "4,5,1.0";
-
-       public static final DataSet<Edge<Long, Double>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
-
-               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
-               edges.add(new Edge<Long, Double>(1L, 3L, 3.0));
-               edges.add(new Edge<Long, Double>(2L, 4L, 3.0));
-               edges.add(new Edge<Long, Double>(2L, 5L, 2.0));
-               edges.add(new Edge<Long, Double>(3L, 2L, 1.0));
-               edges.add(new Edge<Long, Double>(3L, 5L, 5.0));
-               edges.add(new Edge<Long, Double>(4L, 5L, 1.0));
-
-               return env.fromCollection(edges);
-       }
-
-       public static final String EDGES_IN_SSSP = "1,3,3.0\n" + "2,5,2.0\n" + 
"3,2,1.0\n" + "4,5,1.0";
-
-       public static final DataSet<Edge<Long, Double>> 
getDefaultEdgesInSSSP(ExecutionEnvironment env) {
-
-               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
-               edges.add(new Edge<Long, Double>(1L, 3L, 3.0));
-               edges.add(new Edge<Long, Double>(2L, 5L, 2.0));
-               edges.add(new Edge<Long, Double>(3L, 2L, 1.0));
-               edges.add(new Edge<Long, Double>(4L, 5L, 1.0));
-
-               return env.fromCollection(edges);
-       }
-
-       public static final String SRC_EDGE_TO_BE_REMOVED = "2";
-
-       public static final String TRG_EDGE_TO_BE_REMOVED = "5";
-
-       public static final String VAL_EDGE_TO_BE_REMOVED = "2.0";
-
-       public static final Edge<Long, Double> getDefaultEdgeToBeRemoved() {
-
-               return new Edge<Long, Double>(2L, 5L, 2.0);
-       }
-
-       public static final String RESULTED_VERTICES = "1," + Double.MAX_VALUE 
+ "\n" + "2," + Double.MAX_VALUE+ "\n"
-                       + "3," + Double.MAX_VALUE + "\n" + "4,1.0\n" + "5,0.0";
-
-       private IncrementalSSSPData() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java
deleted file mode 100644
index 7564b95..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example.utils;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Provides the default data sets used for the Jaccard Similarity Measure 
example program.
- * If no parameters are given to the program, the default data sets are used.
- */
-public class JaccardSimilarityMeasureData {
-
-       public static final String EDGES = "1   2\n" + "1       3\n" + "1       
4\n" + "1       5\n" + "2       3\n" + "2       4\n" +
-                       "2      5\n" + "3       4\n" + "3       5\n" + "4       
5";
-
-       public static DataSet<Edge<Long, Double>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
-
-               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
-               edges.add(new Edge<Long, Double>(1L, 2L, new Double(0)));
-               edges.add(new Edge<Long, Double>(1L, 3L, new Double(0)));
-               edges.add(new Edge<Long, Double>(1L, 4L, new Double(0)));
-               edges.add(new Edge<Long, Double>(1L, 5L, new Double(0)));
-               edges.add(new Edge<Long, Double>(2L, 3L, new Double(0)));
-               edges.add(new Edge<Long, Double>(2L, 4L, new Double(0)));
-               edges.add(new Edge<Long, Double>(2L, 5L, new Double(0)));
-               edges.add(new Edge<Long, Double>(3L, 4L, new Double(0)));
-               edges.add(new Edge<Long, Double>(3L, 5L, new Double(0)));
-               edges.add(new Edge<Long, Double>(4L, 5L, new Double(0)));
-
-               return env.fromCollection(edges);
-       }
-
-       public static final String JACCARD_EDGES = "1,2,0.6\n" + "1,3,0.6\n" + 
"1,4,0.6\n" + "1,5,0.6\n" +
-                       "2,3,0.6\n" + "2,4,0.6\n" + "2,5,0.6\n" + "3,4,0.6\n" + 
"3,5,0.6\n" + "4,5,0.6";
-
-       private JaccardSimilarityMeasureData() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
deleted file mode 100644
index 0a92097..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example.utils;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.NullValue;
-
-/**
- * Provides the default data set used for the Label Propagation test program.
- * If no parameters are given to the program, the default edge data set is 
used.
- */
-public class LabelPropagationData {
-       
-       public static final String LABELS_AFTER_1_ITERATION = "1,10\n" +
-                       "2,10\n" +
-                       "3,10\n" +
-                       "4,40\n" +
-                       "5,40\n" +
-                       "6,40\n" +
-                       "7,40\n";
-
-       public static final String LABELS_WITH_TIE ="1,10\n" +
-                       "2,10\n" +
-                       "3,10\n" +
-                       "4,10\n" +
-                       "5,20\n" +
-                       "6,20\n" +
-                       "7,20\n" +
-                       "8,20\n" +
-                       "9,20\n";
-
-       private LabelPropagationData() {}
-
-       public static final DataSet<Vertex<Long, Long>> 
getDefaultVertexSet(ExecutionEnvironment env) {
-
-               List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, 
Long>>();
-               vertices.add(new Vertex<Long, Long>(1l, 10l));
-               vertices.add(new Vertex<Long, Long>(2l, 10l));
-               vertices.add(new Vertex<Long, Long>(3l, 30l));
-               vertices.add(new Vertex<Long, Long>(4l, 40l));
-               vertices.add(new Vertex<Long, Long>(5l, 40l));
-               vertices.add(new Vertex<Long, Long>(6l, 40l));
-               vertices.add(new Vertex<Long, Long>(7l, 40l));
-
-               return env.fromCollection(vertices);
-       }
-
-       public static final DataSet<Edge<Long, NullValue>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
-
-               List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, 
NullValue>>();
-               edges.add(new Edge<Long, NullValue>(1L, 3L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(2L, 3L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(4L, 7L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(5L, 7L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(6L, 7L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(7L, 3L, 
NullValue.getInstance()));
-
-               return env.fromCollection(edges);
-       }
-
-       public static final DataSet<Vertex<Long, Long>> 
getTieVertexSet(ExecutionEnvironment env) {
-
-               List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, 
Long>>();
-               vertices.add(new Vertex<Long, Long>(1l, 10l));
-               vertices.add(new Vertex<Long, Long>(2l, 10l));
-               vertices.add(new Vertex<Long, Long>(3l, 10l));
-               vertices.add(new Vertex<Long, Long>(4l, 10l));
-               vertices.add(new Vertex<Long, Long>(5l, 0l));
-               vertices.add(new Vertex<Long, Long>(6l, 20l));
-               vertices.add(new Vertex<Long, Long>(7l, 20l));
-               vertices.add(new Vertex<Long, Long>(8l, 20l));
-               vertices.add(new Vertex<Long, Long>(9l, 20l));
-
-               return env.fromCollection(vertices);
-       }
-
-       public static final DataSet<Edge<Long, NullValue>> 
getTieEdgeDataSet(ExecutionEnvironment env) {
-
-               List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, 
NullValue>>();
-               edges.add(new Edge<Long, NullValue>(1L, 5L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(2L, 5L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(4L, 5L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(5L, 5L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(6L, 5L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(7L, 5L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(8L, 5L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(9L, 5L, 
NullValue.getInstance()));
-
-               return env.fromCollection(edges);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
deleted file mode 100644
index 3a97d1f..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example.utils;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-
-/**
- * Provides the default data sets used for the Music Profiles example program.
- * If no parameters are given to the program, the default data sets are used.
- */
-public class MusicProfilesData {
-
-       public static DataSet<Tuple3<String, String, Integer>> 
getUserSongTriplets(ExecutionEnvironment env) {
-               List<Tuple3<String, String, Integer>> triplets = new 
ArrayList<Tuple3<String, String, Integer>>();
-               
-               triplets.add(new Tuple3<String, String, Integer>("user_1", 
"song_1", 100));
-               triplets.add(new Tuple3<String, String, Integer>("user_1", 
"song_2", 10));
-               triplets.add(new Tuple3<String, String, Integer>("user_1", 
"song_3", 20));
-               triplets.add(new Tuple3<String, String, Integer>("user_1", 
"song_4", 30));
-               triplets.add(new Tuple3<String, String, Integer>("user_1", 
"song_5", 1));
-               
-               triplets.add(new Tuple3<String, String, Integer>("user_2", 
"song_6", 40));
-               triplets.add(new Tuple3<String, String, Integer>("user_2", 
"song_7", 10));
-               triplets.add(new Tuple3<String, String, Integer>("user_2", 
"song_8", 3));
-               
-               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_1", 100));
-               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_2", 10));
-               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_3", 20));
-               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_8", 30));
-               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_9", 1));
-               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_10", 8));
-               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_11", 90));
-               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_12", 30));
-               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_13", 34));
-               triplets.add(new Tuple3<String, String, Integer>("user_3", 
"song_14", 17));
-               
-               triplets.add(new Tuple3<String, String, Integer>("user_4", 
"song_1", 100));
-               triplets.add(new Tuple3<String, String, Integer>("user_4", 
"song_6", 10));
-               triplets.add(new Tuple3<String, String, Integer>("user_4", 
"song_8", 20));
-               triplets.add(new Tuple3<String, String, Integer>("user_4", 
"song_12", 30));
-               triplets.add(new Tuple3<String, String, Integer>("user_4", 
"song_13", 1));
-               triplets.add(new Tuple3<String, String, Integer>("user_4", 
"song_15", 1));
-               
-               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_3", 300));
-               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_4", 4));
-               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_5", 5));
-               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_8", 8));
-               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_9", 9));
-               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_10", 10));
-               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_12", 12));
-               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_13", 13));
-               triplets.add(new Tuple3<String, String, Integer>("user_5", 
"song_15", 15));
-
-               triplets.add(new Tuple3<String, String, Integer>("user_6", 
"song_6", 30));
-
-               return env.fromCollection(triplets);
-       }
-       
-       public static DataSet<String> getMismatches(ExecutionEnvironment env) {
-               List<String> errors = new ArrayList<String>();
-               errors.add("ERROR: <song_8 track_8> Sever");
-               errors.add("ERROR: <song_15 track_15> Black Trees");
-               return env.fromCollection(errors);
-       }
-
-       public static final String USER_SONG_TRIPLETS = "user_1 song_1  100\n" 
+ "user_1        song_5  200\n"
-                       + "user_2       song_1  10\n" + "user_2 song_4  20\n"
-                       + "user_3       song_2  3\n"
-                       + "user_4       song_2  1\n" + "user_4  song_3  2\n"
-                       + "user_5       song_3  30";
-
-       public static final String MISMATCHES = "ERROR: <song_5 track_8> Angie";
-
-       public static final String MAX_ITERATIONS = "2";
-
-       public static final String TOP_SONGS_RESULT = "user_1   song_1\n" +
-                                                               "user_2 
song_4\n" +
-                                                               "user_3 
song_2\n" +
-                                                               "user_4 
song_3\n" +
-                                                               "user_5 song_3";
-
-       public static final String COMMUNITIES_RESULT = "user_1 1\n" +
-                                                               "user_2 1\n" +
-                                                               "user_3 3\n" +
-                                                               "user_4 3\n" +
-                                                               "user_5 4";
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
deleted file mode 100644
index 58d4f8b..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example.utils;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-
-/**
- * Provides the default data set used for the PageRank test program.
- * If no parameters are given to the program, the default edge data set is 
used.
- */
-public class PageRankData {
-       
-       public static final String EDGES = "2   1\n" +
-                                                                               
"5      2\n" + 
-                                                                               
"5      4\n" +
-                                                                               
"4      3\n" +
-                                                                               
"4      2\n" +
-                                                                               
"1      4\n" +
-                                                                               
"1      2\n" +
-                                                                               
"1      3\n" +
-                                                                               
"3      5\n";
-
-       
-       public static final String RANKS_AFTER_3_ITERATIONS = "1,0.237\n" +
-                                                                               
                                "2,0.248\n" + 
-                                                                               
                                "3,0.173\n" +
-                                                                               
                                "4,0.175\n" +
-                                                                               
                                "5,0.165\n";
-
-       private PageRankData() {}
-
-       public static final DataSet<Edge<Long, Double>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
-
-               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
-               edges.add(new Edge<Long, Double>(2L, 1L, 1.0));
-               edges.add(new Edge<Long, Double>(5L, 2L, 1.0));
-               edges.add(new Edge<Long, Double>(5L, 4L, 1.0));
-               edges.add(new Edge<Long, Double>(4L, 3L, 1.0));
-               edges.add(new Edge<Long, Double>(4L, 2L, 1.0));
-               edges.add(new Edge<Long, Double>(1L, 4L, 1.0));
-               edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
-               edges.add(new Edge<Long, Double>(1L, 3L, 1.0));
-               edges.add(new Edge<Long, Double>(3L, 5L, 1.0));
-
-               return env.fromCollection(edges);
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
deleted file mode 100644
index 6b985c5..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SingleSourceShortestPathsData.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example.utils;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-
-/**
- * Provides the default data set used for the Single Source Shortest Paths 
example program.
- * If no parameters are given to the program, the default edge data set is 
used.
- */
-public class SingleSourceShortestPathsData {
-
-       public static final Long SRC_VERTEX_ID = 1L;
-
-       public static final String EDGES = "1\t2\t12.0\n" + "1\t3\t13.0\n" + 
"2\t3\t23.0\n" + "3\t4\t34.0\n" + "3\t5\t35.0\n" +
-                                       "4\t5\t45.0\n" + "5\t1\t51.0";
-
-       public static final Object[][] DEFAULT_EDGES = new Object[][] {
-               new Object[]{1L, 2L, 12.0},
-               new Object[]{1L, 3L, 13.0},
-               new Object[]{2L, 3L, 23.0},
-               new Object[]{3L, 4L, 34.0},
-               new Object[]{3L, 5L, 35.0},
-               new Object[]{4L, 5L, 45.0},
-               new Object[]{5L, 1L, 51.0}
-       };
-
-       public static final String RESULTED_SINGLE_SOURCE_SHORTEST_PATHS =  
"1,0.0\n" + "2,12.0\n" + "3,13.0\n" + 
-                                                               "4,47.0\n" + 
"5,48.0";
-
-       public static DataSet<Edge<Long, Double>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
-               
-               List<Edge<Long, Double>> edgeList = new LinkedList<Edge<Long, 
Double>>();
-               for (Object[] edge : DEFAULT_EDGES) {
-                       edgeList.add(new Edge<Long, Double>((Long) edge[0], 
(Long) edge[1], (Double) edge[2]));
-               }
-               return env.fromCollection(edgeList);
-       }
-
-       private SingleSourceShortestPathsData() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java
deleted file mode 100644
index 88f76cc..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SummarizationData.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example.utils;
-
-import com.google.common.collect.Lists;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.NullValue;
-
-import java.util.List;
-
-/**
- * Provides the default data set used for Summarization tests.
- */
-public class SummarizationData {
-
-       private SummarizationData() {}
-
-       /**
-        * The resulting vertex id can be any id of the vertices summarized by 
the single vertex.
-        *
-        * Format:
-        *
-        * "possible-id[,possible-id];group-value,group-count"
-        */
-       public static final String[] EXPECTED_VERTICES = new String[] {
-                       "0,1;A,2",
-                       "2,3,4;B,3",
-                       "5;C,1"
-       };
-
-       /**
-        * Format:
-        *
-        * 
"possible-source-id[,possible-source-id];possible-target-id[,possible-target-id];group-value,group-count"
-        */
-       public static final String[] EXPECTED_EDGES_WITH_VALUES = new String[] {
-                       "0,1;0,1;A,2",
-                       "0,1;2,3,4;A,1",
-                       "2,3,4;0,1;A,1",
-                       "2,3,4;0,1;C,2",
-                       "2,3,4;2,3,4;B,2",
-                       "5;2,3,4;D,2"
-       };
-
-       /**
-        * Format:
-        *
-        * 
"possible-source-id[,possible-source-id];possible-target-id[,possible-target-id];group-value,group-count"
-        */
-       public static final String[] EXPECTED_EDGES_ABSENT_VALUES = new 
String[] {
-                       "0,1;0,1;(null),2",
-                       "0,1;2,3,4;(null),1",
-                       "2,3,4;0,1;(null),3",
-                       "2,3,4;2,3,4;(null),2",
-                       "5;2,3,4;(null),2"
-       };
-
-       /**
-        * Creates a set of vertices with attached {@link String} values.
-        *
-        * @param env execution environment
-        * @return vertex data set with string values
-        */
-       public static DataSet<Vertex<Long, String>> 
getVertices(ExecutionEnvironment env) {
-               List<Vertex<Long, String>> vertices = 
Lists.newArrayListWithExpectedSize(6);
-               vertices.add(new Vertex<>(0L, "A"));
-               vertices.add(new Vertex<>(1L, "A"));
-               vertices.add(new Vertex<>(2L, "B"));
-               vertices.add(new Vertex<>(3L, "B"));
-               vertices.add(new Vertex<>(4L, "B"));
-               vertices.add(new Vertex<>(5L, "C"));
-
-               return env.fromCollection(vertices);
-       }
-
-       /**
-        * Creates a set of edges with attached {@link String} values.
-        *
-        * @param env execution environment
-        * @return edge data set with string values
-        */
-       public static DataSet<Edge<Long, String>> getEdges(ExecutionEnvironment 
env) {
-               List<Edge<Long, String>> edges = 
Lists.newArrayListWithExpectedSize(10);
-               edges.add(new Edge<>(0L, 1L, "A"));
-               edges.add(new Edge<>(1L, 0L, "A"));
-               edges.add(new Edge<>(1L, 2L, "A"));
-               edges.add(new Edge<>(2L, 1L, "A"));
-               edges.add(new Edge<>(2L, 3L, "B"));
-               edges.add(new Edge<>(3L, 2L, "B"));
-               edges.add(new Edge<>(4L, 0L, "C"));
-               edges.add(new Edge<>(4L, 1L, "C"));
-               edges.add(new Edge<>(5L, 2L, "D"));
-               edges.add(new Edge<>(5L, 3L, "D"));
-
-               return env.fromCollection(edges);
-       }
-
-       /**
-        * Creates a set of edges with {@link NullValue} as edge value.
-        *
-        * @param env execution environment
-        * @return edge data set with null values
-        */
-       @SuppressWarnings("serial")
-       public static DataSet<Edge<Long, NullValue>> 
getEdgesWithAbsentValues(ExecutionEnvironment env) {
-               return getEdges(env).map(new MapFunction<Edge<Long, String>, 
Edge<Long, NullValue>>() {
-                       @Override
-                       public Edge<Long, NullValue> map(Edge<Long, String> 
value) throws Exception {
-                               return new Edge<>(value.getSource(), 
value.getTarget(), NullValue.getInstance());
-                       }
-               });
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
deleted file mode 100644
index c8cea12..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example.utils;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.types.NullValue;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Provides the default data sets used for the Triangle Count test program.
- * If no parameters are given to the program, the default data sets are used.
- */
-public class TriangleCountData {
-
-       public static final String EDGES = "1   2\n"+"1 3\n"+"2 3\n"+"2 6\n"+"3 
4\n"+"3 5\n"+"3 6\n"+"4 5\n"+"6 7\n";
-
-       public static DataSet<Edge<Long, NullValue>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
-
-               List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, 
NullValue>>();
-               edges.add(new Edge<Long, NullValue>(1L, 2L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(1L, 3L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(2L, 3L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(2L, 6L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(3L, 4L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(3L, 5L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(3L, 6L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(4L, 5L, 
NullValue.getInstance()));
-               edges.add(new Edge<Long, NullValue>(6L, 7L, 
NullValue.getInstance()));
-
-               return env.fromCollection(edges);
-       }
-
-       public static final String RESULTED_NUMBER_OF_TRIANGLES = "3";
-
-       public static List<Tuple3<Long,Long,Long>> getListOfTriangles() {
-               ArrayList<Tuple3<Long,Long,Long>> ret = new ArrayList<>(3);
-               ret.add(new Tuple3<>(1L,2L,3L));
-               ret.add(new Tuple3<>(2L,3L,6L));
-               ret.add(new Tuple3<>(3L,4L,5L));
-               return ret;
-       }
-
-       private TriangleCountData () {}
-}

Reply via email to