Repository: flink
Updated Branches:
  refs/heads/master bc21de2e4 -> d01d36970


[FLINK-1520] [gelly] Read edges and vertices from CSV files

This squashes the following commit:

[FLINK-1520] [gelly]Changed the methods for specifying types.
Created a new file for tests. Made appropriate changes in gelly_guide.md


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/702277fa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/702277fa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/702277fa

Branch: refs/heads/master
Commit: 702277faa47baf34133cdf3c7d13b13ac62887dd
Parents: bc21de2
Author: Shivani <[email protected]>
Authored: Wed Jun 17 15:37:36 2015 +0200
Committer: vasia <[email protected]>
Committed: Thu Sep 24 11:08:56 2015 +0200

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        |  17 +
 .../main/java/org/apache/flink/graph/Graph.java |  46 ++
 .../org/apache/flink/graph/GraphCsvReader.java  | 462 +++++++++++++++++++
 .../graph/example/ConnectedComponents.java      |  49 +-
 .../example/GSASingleSourceShortestPaths.java   |  24 +-
 .../flink/graph/example/GraphMetrics.java       |  27 +-
 .../flink/graph/example/IncrementalSSSP.java    |  68 +--
 .../example/SingleSourceShortestPaths.java      |   2 +-
 .../operations/GraphCreationWithCsvITCase.java  | 214 +++++++++
 9 files changed, 805 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/702277fa/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 562df31..b7af56b 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -104,6 +104,23 @@ DataSet<Tuple3<String, String, Double>> edgeTuples = 
env.readCsvFile("path/to/ed
 Graph<String, Long, Double> graph = Graph.fromTupleDataSet(vertexTuples, 
edgeTuples, env);
 {% endhighlight %}
 
+* from a CSV file with three fields and an optional CSV file with 2 fields. In 
this case, Gelly will convert each row from the CSV file containing edges data 
to an `Edge`, where the first field will be the source ID, the second field 
will be the target ID and the third field will be the edge value. Equivalently, 
each row from the optional CSV file containing vertices will be converted to a 
`Vertex`, where the first field will be the vertex ID and the second field will 
be the vertex value. A `typesEdges()` method is called on the GraphCsvReader 
object returned by `fromCsvReader()` to inform the CsvReader of the types of 
the  fields for Edges. If Edge doesn't have a value only type of Vertex Key is 
passed. `typesEdges()` method returns a GraphCsvReader on calling calling 
`typesVertices()` or `typesVerticesNullEdge()` returns the instance of Graph:
+
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+Graph<String, Long, NullValue> graph = 
Graph.fromCsvReader("path/to/vertex/input", "path/to/edge/input", 
env).typesEdges(String.class).typesVerticesNullEdge(String.class, Long.class);
+{% endhighlight %}
+
+If Vertices don't have a value, overloaded `typesVerticesNullEdge()` or 
`typesVertices()` Method should be used.
+
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+Graph<String, NullValue, Long> graph = 
Graph.fromCsvReader("path/to/vertex/input", "path/to/edge/input", 
env).typesEdges(String.class, Long.class).typesVerticesNullEdge(String.class);
+{% endhighlight %}
+
+
 * from a `Collection` of edges and an optional `Collection` of vertices:
 
 {% highlight java %}

http://git-wip-us.apache.org/repos/asf/flink/blob/702277fa/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 84085c9..0153837 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -282,6 +282,52 @@ public class Graph<K, VV, EV> {
        }
 
        /**
+       * Creates a graph from CSV files.
+       *
+       * Vertices with value are created from a CSV file with 2 fields
+       * Edges with value are created from a CSV file with 3 fields
+       * @param verticesPath path to a CSV file with the Vertices data.
+       * @param edgesPath path to a CSV file with the Edges data
+       * @param context the flink execution environment.
+       * @return An instance of {@link org.apache.flink.graph.GraphCsvReader} 
, on which calling typesEdges() and typesVertices() methods to specify types of 
the
+       *Vertex ID, Vertex Value and Edge value returns a Graph
+       */
+       public static  GraphCsvReader fromCsvReader(String verticesPath, String 
edgesPath, ExecutionEnvironment context) {
+               return new GraphCsvReader(verticesPath, edgesPath, context);
+       }
+       /** Creates a graph from a CSV file for Edges.Vertices are
+       * induced from the edges.
+       *
+       * Edges with value are created from a CSV file with 3 fields. Vertices 
are created
+       * automatically and their values are set to NullValue.
+       *
+       * @param edgesPath a path to a CSV file with the Edges data
+       * @param context the flink execution environment.
+       * @return An instance of {@link org.apache.flink.graph.GraphCsvReader} 
, on which calling typesEdges() and typesVertices() methods to specify types of 
the
+       * Vertex ID, Vertex Value and Edge value returns a Graph
+       */
+       public static GraphCsvReader fromCsvReader(String edgesPath, 
ExecutionEnvironment context) {
+               return new GraphCsvReader(edgesPath, context);
+       }
+
+       /**
+        *Creates a graph from a CSV file for Edges., Vertices are
+        * induced from the edges and vertex values are calculated by a mapper
+        * function.  Edges with value are created from a CSV file with 3 
fields.
+        * Vertices are created automatically and their values are set by 
applying the provided map
+        * function to the vertex ids.
+        *
+        * @param edgesPath a path to a CSV file with the Edges data
+        * @param mapper the mapper function.
+        * @param context the flink execution environment.
+        * @return An instance of {@link org.apache.flink.graph.GraphCsvReader} 
,on which calling typesEdges() and typesVertices() methods to specify types of 
the
+        * Vertex ID, Vertex Value and Edge value returns a Graph
+        */
+       public static GraphCsvReader fromCsvReader(String edgesPath, final 
MapFunction mapper, ExecutionEnvironment context) {
+               return new GraphCsvReader(edgesPath, mapper, context);
+       }
+
+       /**
         * @return the flink execution environment.
         */
        public ExecutionEnvironment getContext() {

http://git-wip-us.apache.org/repos/asf/flink/blob/702277fa/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
new file mode 100644
index 0000000..d4a5b30
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.CsvReader;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.types.NullValue;
+/**
+ * A class to build a Graph using path(s) provided to CSV file(s) with edge 
(vertices) data
+ * The class also configures the CSV readers used to read edges(vertices) data 
such as the field types,
+ * the delimiters (row and field),  the fields that should be included or 
skipped, and other flags
+ * such as whether to skip the initial line as the header.
+ * The configuration is done using the functions provided in The {@link 
org.apache.flink.api.java.io.CsvReader} class.
+ */
+@SuppressWarnings({"unused" , "unchecked"})
+public class GraphCsvReader<K,VV,EV> {
+
+       private final Path vertexPath,edgePath;
+       private final ExecutionEnvironment executionContext;
+       protected CsvReader EdgeReader;
+       protected CsvReader VertexReader;
+       protected MapFunction<K, VV> mapper;
+       protected Class<K> vertexKey;
+       protected Class<VV> vertexValue;
+       protected Class<EV> edgeValue;
+
+//--------------------------------------------------------------------------------------------------------------------
+       public GraphCsvReader(Path vertexPath,Path edgePath, 
ExecutionEnvironment context) {
+               this.vertexPath = vertexPath;
+               this.edgePath = edgePath;
+               this.VertexReader = new CsvReader(vertexPath,context);
+               this.EdgeReader = new CsvReader(edgePath,context);
+               this.mapper=null;
+               this.executionContext=context;
+       }
+
+       public GraphCsvReader(Path edgePath, ExecutionEnvironment context) {
+               this.vertexPath = null;
+               this.edgePath = edgePath;
+               this.EdgeReader = new CsvReader(edgePath,context);
+               this.VertexReader = null;
+               this.mapper = null;
+               this.executionContext=context;
+       }
+
+       public GraphCsvReader(Path edgePath,final MapFunction<K, VV> mapper, 
ExecutionEnvironment context) {
+               this.vertexPath = null;
+               this.edgePath = edgePath;
+               this.EdgeReader = new CsvReader(edgePath,context);
+               this.VertexReader = null;
+               this.mapper = mapper;
+               this.executionContext=context;
+       }
+
+       public GraphCsvReader (String edgePath,ExecutionEnvironment context) {
+               this(new Path(Preconditions.checkNotNull(edgePath, "The file 
path may not be null.")), context);
+
+       }
+
+       public GraphCsvReader(String vertexPath, String edgePath, 
ExecutionEnvironment context) {
+               this(new Path(Preconditions.checkNotNull(vertexPath, "The file 
path may not be null.")),
+                               new Path(Preconditions.checkNotNull(edgePath, 
"The file path may not be null.")), context);
+       }
+
+
+       public GraphCsvReader (String edgePath, final MapFunction<K, VV> 
mapper, ExecutionEnvironment context) {
+                       this(new Path(Preconditions.checkNotNull(edgePath, "The 
file path may not be null.")),mapper, context);
+       }
+
+       
//--------------------------------------------------------------------------------------------------------------------
+       /**
+        * Specifies the types for the edges fields and returns this instance 
of GraphCsvReader
+        *
+        * @param vertexKey The type of Vetex ID in the Graph.
+        * @param  edgeValue The type of Edge Value in the returned Graph.
+        * @return The {@link org.apache.flink.graph.GraphCsvReader}
+        */
+       public GraphCsvReader typesEdges(Class<K> vertexKey, Class<EV> 
edgeValue) {
+               this.vertexKey = vertexKey;
+               this.edgeValue = edgeValue;
+               return this;
+       }
+
+       /**
+        * Specifies the types for the edges fields and returns this instance 
of GraphCsvReader
+        * This method is overloaded for the case when the type of EdgeValue is 
NullValue
+        * @param vertexKey The type of Vetex ID in the Graph.
+        * @return The {@link org.apache.flink.graph.GraphCsvReader}
+        */
+       public GraphCsvReader typesEdges(Class<K> vertexKey) {
+               this.vertexKey = vertexKey;
+               this.edgeValue = null;
+               return this;
+       }
+
+       /**
+        * Specifies the types for the vertices fields and returns an instance 
of Graph
+        * @param vertexKey The type of Vertex ID in the Graph.
+        * @param vertexValue The type of Vertex Value in the Graph.
+        * @return The {@link org.apache.flink.graph.Graph}
+        */
+       public Graph<K, VV, EV> typesVertices(Class vertexKey, Class 
vertexValue) {
+               DataSet<Tuple3<K, K, EV>> edges = 
this.EdgeReader.types(this.vertexKey,this.vertexKey, this.edgeValue);
+               if(mapper == null && this.VertexReader != null) {
+               DataSet<Tuple2<K, VV>> vertices = 
this.VertexReader.types(vertexKey, vertexValue);
+               return Graph.fromTupleDataSet(vertices, edges, 
executionContext);
+               } else if(this.mapper != null) {
+               return Graph.fromTupleDataSet(edges, this.mapper, 
executionContext);
+               } else {
+                       return null;
+               }
+       }
+
+       /**
+        * Specifies the types for the vertices fields and returns and instance 
of Graph
+        * This method is overloaded for the case when vertices don't have a 
value
+        * @param vertexKey The type of Vertex ID in the Graph.
+        * @return The {@link org.apache.flink.graph.Graph}
+        */
+       public Graph<K, NullValue, EV> typesVertices(Class vertexKey) {
+               DataSet<Tuple3<K, K, EV>> edges = 
this.EdgeReader.types(this.vertexKey, this.vertexKey, this.edgeValue);
+               return Graph.fromTupleDataSet(edges, executionContext);
+       }
+
+       /**
+        * Specifies the types for the vertices fields and returns an instance 
of Graph when Edges don't have a value
+        * @param vertexKey The type of Vertex ID in the Graph.
+        * @param vertexValue The type of Vertex Value in the Graph.
+        * @return The {@link org.apache.flink.graph.Graph}
+        */
+       public Graph<K, VV, NullValue> typesVerticesNullEdge(Class vertexKey, 
Class vertexValue) {
+               DataSet<Tuple3<K, K, NullValue>> edges= 
this.EdgeReader.types(this.vertexKey, this.vertexKey)
+                               .map(new MapFunction<Tuple2<K, K>, Tuple3<K, K, 
NullValue>>() {
+                                       public Tuple3<K, K, NullValue> 
map(Tuple2<K, K> value) {
+                                               return new Tuple3<K, K, 
NullValue>(value.f0, value.f1, NullValue.getInstance());
+                                       }
+                               });
+               if(this.mapper == null && this.VertexReader != null) {
+               DataSet<Tuple2<K, VV>> vertices = 
this.VertexReader.types(vertexKey, vertexValue);
+               return Graph.fromTupleDataSet(vertices, edges, 
executionContext);
+               } else if (this.mapper != null) {
+                       return Graph.fromTupleDataSet(edges, mapper, 
executionContext);
+               } else {
+               return null;
+               }
+       }
+
+       /**
+        * Specifies the types for the vertices fields and returns an instance 
of Graph when Edges don't have a value
+        * This method is overloaded for the case when vertices don't have a 
value
+        * @param vertexKey The type of Vertex ID in the Graph.
+        * @return The {@link org.apache.flink.graph.Graph}
+        */
+       public Graph<K, NullValue, NullValue> typesVerticesNullEdge(Class 
vertexKey) {
+               DataSet<Tuple3<K, K, NullValue>> edges= 
this.EdgeReader.types(this.vertexKey, this.vertexKey)
+                               .map(new MapFunction<Tuple2<K, K>, Tuple3<K, K, 
NullValue>>() {
+                                       public Tuple3<K, K, NullValue> 
map(Tuple2<K, K> value) {
+                                               return new Tuple3<K, K, 
NullValue>(value.f0, value.f1, NullValue.getInstance());
+                                       }
+                               });
+               return Graph.fromTupleDataSet(edges, executionContext);
+       }
+
+       /**
+        *Configures the Delimiter that separates rows for the CSV reader used 
to read the edges
+        *      ({@code '\n'}) is used by default.
+        *
+        *@param delimiter The delimiter that separates the rows.
+        * @return The GraphCsv reader instance itself, to allow for fluent 
function chaining.
+        */
+       public GraphCsvReader lineDelimiterEdges(String delimiter) {
+               this.EdgeReader.lineDelimiter(delimiter);
+               return this;
+       }
+
+       /**
+        *Configures the Delimiter that separates rows for the CSV reader used 
to read the vertices
+        *      ({@code '\n'}) is used by default.
+        *
+        *@param delimiter The delimiter that separates the rows.
+        * @return The GraphCsv reader instance itself, to allow for fluent 
function chaining.
+        */
+       public GraphCsvReader lineDelimiterVertices(String delimiter) {
+               if(this.VertexReader !=null) {
+                       this.VertexReader.lineDelimiter(delimiter);
+               }
+               return this;
+       }
+
+       /**
+        *Configures the Delimiter that separates fields in a row for the CSV 
reader used to read the vertices
+        * ({@code ','}) is used by default.
+        *
+        * @param delimiter The delimiter that separates the fields in a row.
+        * @return The GraphCsv reader instance itself, to allow for fluent 
function chaining.
+        */
+       public GraphCsvReader fieldDelimiterVertices(String delimiter) {
+               if(this.VertexReader !=null) {
+                       this.VertexReader.fieldDelimiter(delimiter);
+               }
+               return this;
+       }
+
+       /**
+        *Configures the Delimiter that separates fields in a row for the CSV 
reader used to read the edges
+        * ({@code ','}) is used by default.
+        *
+        * @param delimiter The delimiter that separates the fields in a row.
+        * @return The GraphCsv reader instance itself, to allow for fluent 
function chaining.
+        */
+       public GraphCsvReader fieldDelimiterEdges(String delimiter) {
+               this.EdgeReader.fieldDelimiter(delimiter);
+               return this;
+       }
+
+       /**
+        * Enables quoted String parsing for Edge Csv Reader. Field delimiters 
in quoted Strings are ignored.
+        * A String is parsed as quoted if it starts and ends with a quoting 
character and as unquoted otherwise.
+        * Leading or tailing whitespaces are not allowed.
+        *
+        * @param quoteCharacter The character which is used as quoting 
character.
+        * @return The Graph Csv reader instance itself, to allow for fluent 
function chaining.
+        */
+       public GraphCsvReader parseQuotedStringsEdges(char quoteCharacter) {
+               this.EdgeReader.parseQuotedStrings(quoteCharacter);
+               return this;
+       }
+
+       /**
+        * Enables quoted String parsing for Vertex Csv Reader. Field 
delimiters in quoted Strings are ignored.
+        * A String is parsed as quoted if it starts and ends with a quoting 
character and as unquoted otherwise.
+        * Leading or tailing whitespaces are not allowed.
+        *
+        * @param quoteCharacter The character which is used as quoting 
character.
+        * @return The Graph Csv reader instance itself, to allow for fluent 
function chaining.
+        */
+       public GraphCsvReader parseQuotedStringsVertices(char quoteCharacter) {
+               if(this.VertexReader !=null) {
+                       this.VertexReader.parseQuotedStrings(quoteCharacter);
+               }
+               return this;
+       }
+
+       /**
+        * Configures the string that starts comments for the Vertex Csv Reader.
+        * By default comments will be treated as invalid lines.
+        * This function only recognizes comments which start at the beginning 
of the line!
+        *
+        * @param commentPrefix The string that starts the comments.
+        * @return The Graph csv reader instance itself, to allow for fluent 
function chaining.
+        */
+       public GraphCsvReader ignoreCommentsVertices(String commentPrefix) {
+               if(this.VertexReader !=null) {
+                       this.VertexReader.ignoreComments(commentPrefix);
+               }
+               return this;
+       }
+
+       /**
+        * Configures the string that starts comments for the Edge Csv Reader.
+        * By default comments will be treated as invalid lines.
+        * This function only recognizes comments which start at the beginning 
of the line!
+        *
+        * @param commentPrefix The string that starts the comments.
+        * @return The Graph csv reader instance itself, to allow for fluent 
function chaining.
+        */
+       public GraphCsvReader ignoreCommentsEdges(String commentPrefix) {
+               this.EdgeReader.ignoreComments(commentPrefix);
+               return this;
+       }
+
+       /**
+        * Configures which fields of the CSV file containing vertices data 
should be included and which should be skipped. The
+        * parser will look at the first {@code n} fields, where {@code n} is 
the length of the boolean
+        * array. The parser will skip over all fields where the boolean value 
at the corresponding position
+        * in the array is {@code false}. The result contains the fields where 
the corresponding position in
+        * the boolean array is {@code true}.
+        * The number of fields in the result is consequently equal to the 
number of times that {@code true}
+        * occurs in the fields array.
+        *
+        * @param vertexFields The array of flags that describes which fields 
are to be included from the CSV file for vertices.
+        * @return The CSV reader instance itself, to allow for fluent function 
chaining.
+        */
+       public GraphCsvReader includeFieldsVertices(boolean ... vertexFields) {
+               if(this.VertexReader !=null) {
+                       this.VertexReader.includeFields(vertexFields);
+               }
+               return this;
+       }
+
+       /**
+        * Configures which fields of the CSV file containing edges data should 
be included and which should be skipped. The
+        * parser will look at the first {@code n} fields, where {@code n} is 
the length of the boolean
+        * array. The parser will skip over all fields where the boolean value 
at the corresponding position
+        * in the array is {@code false}. The result contains the fields where 
the corresponding position in
+        * the boolean array is {@code true}.
+        * The number of fields in the result is consequently equal to the 
number of times that {@code true}
+        * occurs in the fields array.
+        *
+        * @param edgeFields The array of flags that describes which fields are 
to be included from the CSV file for edges.
+        * @return The CSV reader instance itself, to allow for fluent function 
chaining.
+        */
+       public GraphCsvReader includeFieldsEdges(boolean ... edgeFields) {
+               this.EdgeReader.includeFields(edgeFields);
+               return this;
+       }
+
+       /**
+        * Configures which fields of the CSV file containing vertices data 
should be included and which should be skipped. The
+        * positions in the string (read from position 0 to its length) define 
whether the field at
+        * the corresponding position in the CSV schema should be included.
+        * parser will look at the first {@code n} fields, where {@code n} is 
the length of the mask string
+        * The parser will skip over all fields where the character at the 
corresponding position
+        * in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} 
(representing the value
+        * {@code false}). The result contains the fields where the 
corresponding position in
+        * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} 
(representing the value {@code true}).
+        *
+        * @param mask The string mask defining which fields to include and 
which to skip.
+        * @return The Graph Csv reader instance itself, to allow for fluent 
function chaining.
+        */
+       public GraphCsvReader includeFieldsVertices(String mask) {
+               if(this.VertexReader !=null) {
+                       this.VertexReader.includeFields(mask);
+               }
+               return this;
+       }
+
+       /**
+        * Configures which fields of the CSV file containing edges data should 
be included and which should be skipped. The
+        * positions in the string (read from position 0 to its length) define 
whether the field at
+        * the corresponding position in the CSV schema should be included.
+        * parser will look at the first {@code n} fields, where {@code n} is 
the length of the mask string
+        * The parser will skip over all fields where the character at the 
corresponding position
+        * in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} 
(representing the value
+        * {@code false}). The result contains the fields where the 
corresponding position in
+        * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} 
(representing the value {@code true}).
+        *
+        * @param mask The string mask defining which fields to include and 
which to skip.
+        * @return The Graph Csv reader instance itself, to allow for fluent 
function chaining.
+        */
+       public GraphCsvReader includeFieldsEdges(String mask) {
+               this.EdgeReader.includeFields(mask);
+               return this;
+       }
+
+       /**
+        * Configures which fields of the CSV file containing vertices data 
should be included and which should be skipped. The
+        * bits in the value (read from least significant to most significant) 
define whether the field at
+        * the corresponding position in the CSV schema should be included.
+        * parser will look at the first {@code n} fields, where {@code n} is 
the position of the most significant
+        * non-zero bit.
+        * The parser will skip over all fields where the character at the 
corresponding bit is zero, and
+        * include the fields where the corresponding bit is one.
+        * <p>
+        * Examples:
+        * <ul>
+        *   <li>A mask of {@code 0x7} would include the first three 
fields.</li>
+        *   <li>A mask of {@code 0x26} (binary {@code 100110} would skip the 
first fields, include fields
+        *       two and three, skip fields four and five, and include field 
six.</li>
+        * </ul>
+        *
+        * @param mask The bit mask defining which fields to include and which 
to skip.
+        * @return The Graph CSV reader instance itself, to allow for fluent 
function chaining.
+        */
+       public GraphCsvReader includeFieldsVertices(long mask) {
+               if(this.VertexReader !=null) {
+                       this.VertexReader.includeFields(mask);
+               }
+               return this;
+       }
+
+       /**
+        * Configures which fields of the CSV file containing edges data should 
be included and which should be skipped. The
+        * bits in the value (read from least significant to most significant) 
define whether the field at
+        * the corresponding position in the CSV schema should be included.
+        * parser will look at the first {@code n} fields, where {@code n} is 
the position of the most significant
+        * non-zero bit.
+        * The parser will skip over all fields where the character at the 
corresponding bit is zero, and
+        * include the fields where the corresponding bit is one.
+        * <p>
+        * Examples:
+        * <ul>
+        *   <li>A mask of {@code 0x7} would include the first three 
fields.</li>
+        *   <li>A mask of {@code 0x26} (binary {@code 100110} would skip the 
first fields, include fields
+        *       two and three, skip fields four and five, and include field 
six.</li>
+        * </ul>
+        *
+        * @param mask The bit mask defining which fields to include and which 
to skip.
+        * @return The Graph CSV reader instance itself, to allow for fluent 
function chaining.
+        */
+       public GraphCsvReader includeFieldsEdges(long mask) {
+               this.EdgeReader.includeFields(mask);
+               return this;
+       }
+
+       /**
+        * Sets the CSV reader for the Edges file to ignore the first line. 
This is useful for files that contain a header line.
+        *
+        * @return The Graph CSV reader instance itself, to allow for fluent 
function chaining.
+        */
+       public GraphCsvReader ignoreFirstLineEdges() {
+               this.EdgeReader.ignoreFirstLine();
+               return this;
+       }
+
+       /**
+        * Sets the CSV reader for the Vertices file to ignore the first line. 
This is useful for files that contain a header line.
+        *
+        * @return The Graph CSV reader instance itself, to allow for fluent 
function chaining.
+        */
+       public GraphCsvReader ignoreFirstLineVertices() {
+               if(this.VertexReader !=null) {
+                       this.VertexReader.ignoreFirstLine();
+               }
+               return this;
+       }
+
+       /**
+        * Sets the CSV reader for the Edges file  to ignore any invalid lines.
+        * This is useful for files that contain an empty line at the end, 
multiple header lines or comments. This would throw an exception otherwise.
+        *
+        * @return The CSV reader instance itself, to allow for fluent function 
chaining.
+        */
+       public GraphCsvReader ignoreInvalidLinesEdges() {
+               this.EdgeReader.ignoreInvalidLines();
+               return this;
+       }
+
+       /**
+        * Sets the CSV reader Vertices file  to ignore any invalid lines.
+        * This is useful for files that contain an empty line at the end, 
multiple header lines or comments. This would throw an exception otherwise.
+        *
+        * @return The CSV reader instance itself, to allow for fluent function 
chaining.
+        */
+       public GraphCsvReader ignoreInvalidLinesVertices() {
+               if(this.VertexReader !=null) {
+                       this.VertexReader.ignoreInvalidLines();
+               }
+               return this;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/702277fa/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
index 2b17a14..d8abd4e 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
@@ -59,14 +57,8 @@ public class ConnectedComponents implements 
ProgramDescription {
 
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-               DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
-
-               Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, 
new MapFunction<Long, Long>() {
-                       @Override
-                       public Long map(Long value) throws Exception {
-                               return value;
-                       }
-               }, env);
+               //util method getGraph is used
+               Graph<Long, Long, NullValue> graph = 
ConnectedComponents.getGraph(env);
 
                DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
                                .run(new 
GSAConnectedComponents(maxIterations)).getVertices();
@@ -118,24 +110,29 @@ public class ConnectedComponents implements 
ProgramDescription {
 
                return true;
        }
-
-       @SuppressWarnings("serial")
-       private static DataSet<Edge<Long, NullValue>> 
getEdgesDataSet(ExecutionEnvironment env) {
-
-               if(fileOutput) {
-                       return env.readCsvFile(edgeInputPath)
-                                       .ignoreComments("#")
-                                       .fieldDelimiter("\t")
-                                       .lineDelimiter("\n")
-                                       .types(Long.class, Long.class)
-                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, NullValue>>() {
-                                               @Override
-                                               public Edge<Long, NullValue> 
map(Tuple2<Long, Long> value) throws Exception {
-                                                       return new Edge<Long, 
NullValue>(value.f0, value.f1, NullValue.getInstance());
+@SuppressWarnings("unchecked")
+       private static Graph<Long, Long, NullValue> 
getGraph(ExecutionEnvironment env) {
+               Graph<Long, Long, NullValue> graph;
+               if(!fileOutput) {
+                       graph = 
Graph.fromDataSet(ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
+                                       new MapFunction<Long, Long>() {
+
+                                               public Long map(Long label) {
+                                                       return label;
                                                }
-                                       });
+                                       }, env);
                } else {
-                       return 
ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env);
+                       graph = Graph.fromCsvReader(edgeInputPath,new 
MapFunction<Long, Long>() {
+                               public Long map(Long label) {
+                                       return label;
+                               }
+                       }, env).ignoreCommentsEdges("#")
+                                       .fieldDelimiterEdges("\t")
+                                       .lineDelimiterEdges("\n")
+                                       .typesEdges(Long.class)
+                                       .typesVerticesNullEdge(Long.class, 
Long.class);
+
                }
+               return graph;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/702277fa/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
index 9ea8fe2..cfa04e9 100755
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
@@ -30,7 +29,6 @@ import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 
 /**
  * This example shows how to use Gelly's Gather-Sum-Apply iterations.
@@ -61,9 +59,7 @@ public class GSASingleSourceShortestPaths implements 
ProgramDescription {
 
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-               DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
-
-               Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, 
new InitVertices(srcVertexId), env);
+               Graph<Long, Double, Double> graph = 
GSASingleSourceShortestPaths.getGraph(env);
 
                // Execute the GSA iteration
                Graph<Long, Double, Double> result = 
graph.runGatherSumApplyIteration(
@@ -113,7 +109,7 @@ public class GSASingleSourceShortestPaths implements 
ProgramDescription {
                public Double gather(Neighbor<Double, Double> neighbor) {
                        return neighbor.getNeighborValue() + 
neighbor.getEdgeValue();
                }
-       };
+       }
 
        @SuppressWarnings("serial")
        private static final class ChooseMinDistance extends 
SumFunction<Double, Double, Double> {
@@ -121,7 +117,7 @@ public class GSASingleSourceShortestPaths implements 
ProgramDescription {
                public Double sum(Double newValue, Double currentValue) {
                        return Math.min(newValue, currentValue);
                }
-       };
+       }
 
        @SuppressWarnings("serial")
        private static final class UpdateDistance extends ApplyFunction<Long, 
Double, Double> {
@@ -172,15 +168,15 @@ public class GSASingleSourceShortestPaths implements 
ProgramDescription {
                return true;
        }
 
-       private static DataSet<Edge<Long, Double>> 
getEdgeDataSet(ExecutionEnvironment env) {
+       @SuppressWarnings("unchecked")
+       private static Graph<Long, Double, Double> 
getGraph(ExecutionEnvironment env) {
                if (fileOutput) {
-                       return env.readCsvFile(edgesInputPath)
-                                       .fieldDelimiter("\t")
-                                       .lineDelimiter("\n")
-                                       .types(Long.class, Long.class, 
Double.class)
-                                       .map(new Tuple3ToEdgeMap<Long, 
Double>());
+                       return Graph.fromCsvReader(edgesInputPath, new 
InitVertices(srcVertexId), env).fieldDelimiterEdges("\t")
+                                       .lineDelimiterEdges("\n")
+                                       .typesEdges(Long.class, Double.class)
+                                       .typesVertices(Long.class, 
Double.class);
                } else {
-                       return 
SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
+                       return 
Graph.fromDataSet(SingleSourceShortestPathsData.getDefaultEdgeDataSet(env), new 
InitVertices(srcVertexId), env);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/702277fa/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
index 6c4d0c2..8d9beeb 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.example.utils.ExampleUtils;
 import org.apache.flink.types.NullValue;
@@ -57,7 +56,7 @@ public class GraphMetrics implements ProgramDescription {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
                /** create the graph **/
-               Graph<Long, NullValue, NullValue> graph = 
Graph.fromDataSet(getEdgesDataSet(env), env);
+               Graph<Long, NullValue, NullValue> graph = 
GraphMetrics.getGraph(env);
                
                /** get the number of vertices **/
                long numVertices = graph.numberOfVertices();
@@ -150,21 +149,17 @@ public class GraphMetrics implements ProgramDescription {
                return true;
        }
 
-       @SuppressWarnings("serial")
-       private static DataSet<Edge<Long, NullValue>> 
getEdgesDataSet(ExecutionEnvironment env) {
-               if (fileOutput) {
-                       return env.readCsvFile(edgesInputPath)
-                                       
.lineDelimiter("\n").fieldDelimiter("\t")
-                                       .types(Long.class, Long.class).map(
-                                                       new 
MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
-                                                               public 
Edge<Long, NullValue> map(Tuple2<Long, Long> value) {
-                                                                       return 
new Edge<Long, NullValue>(value.f0, value.f1, 
-                                                                               
        NullValue.getInstance());
-                                                               }
-                                       });
+       @SuppressWarnings({"serial", "unchecked"})
+       private static Graph<Long, NullValue, NullValue> 
getGraph(ExecutionEnvironment env) {
+               if(fileOutput) {
+                       return Graph.fromCsvReader(edgesInputPath, env)
+                                       .lineDelimiterEdges("\n")
+                                       .fieldDelimiterEdges("\t")
+                                       .typesEdges(Long.class)
+                                       .typesVerticesNullEdge(Long.class);
+
                } else {
-                       return ExampleUtils.getRandomEdges(env, NUM_VERTICES);
+                       return 
Graph.fromDataSet(ExampleUtils.getRandomEdges(env, NUM_VERTICES), env);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/702277fa/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
index cc672b2..adf9c02 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
@@ -31,8 +31,6 @@ import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexCentricConfiguration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
-import org.apache.flink.graph.utils.Tuple2ToVertexMap;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 
 /**
  * This example illustrates the usage of vertex-centric iteration's
@@ -77,18 +75,12 @@ public class IncrementalSSSP implements ProgramDescription {
 
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
-               DataSet<Vertex<Long, Double>> vertices = 
getVerticesDataSet(env);
-
-               DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
-
-               DataSet<Edge<Long, Double>> edgesInSSSP = 
getEdgesinSSSPDataSet(env);
-
                Edge<Long, Double> edgeToBeRemoved = getEdgeToBeRemoved();
 
-               Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, 
edges, env);
+               Graph<Long, Double, Double> graph = 
IncrementalSSSP.getGraph(env);
 
                // Assumption: all minimum weight paths are kept
-               Graph<Long, Double, Double> ssspGraph = 
Graph.fromDataSet(vertices, edgesInSSSP, env);
+               Graph<Long, Double, Double> ssspGraph = 
IncrementalSSSP.getSSSPGraph(env);
 
                // remove the edge
                graph.removeEdge(edgeToBeRemoved);
@@ -96,7 +88,7 @@ public class IncrementalSSSP implements ProgramDescription {
                // configure the iteration
                VertexCentricConfiguration parameters = new 
VertexCentricConfiguration();
 
-               if(isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
+               if(isInSSSP(edgeToBeRemoved, ssspGraph.getEdges())) {
 
                        parameters.setDirection(EdgeDirection.IN);
                        parameters.setOptDegrees(true);
@@ -110,24 +102,20 @@ public class IncrementalSSSP implements 
ProgramDescription {
                        // Emit results
                        if(fileOutput) {
                                resultedVertices.writeAsCsv(outputPath, "\n", 
",");
-
-                               // since file sinks are lazy, we trigger the 
execution explicitly
-                               env.execute("Incremental SSSP Example");
                        } else {
                                resultedVertices.print();
                        }
 
+                       env.execute("Incremental SSSP Example");
                } else {
                        // print the vertices
                        if(fileOutput) {
-                               vertices.writeAsCsv(outputPath, "\n", ",");
-
-                               // since file sinks are lazy, we trigger the 
execution explicitly
-                               env.execute("Incremental SSSP Example");
+                               graph.getVertices().writeAsCsv(outputPath, 
"\n", ",");
                        } else {
-                               vertices.print();
+                               graph.getVertices().print();
                        }
 
+                       env.execute("Incremental SSSP Example");
                }
        }
 
@@ -251,45 +239,31 @@ public class IncrementalSSSP implements 
ProgramDescription {
                return true;
        }
 
-       private static DataSet<Vertex<Long, Double>> 
getVerticesDataSet(ExecutionEnvironment env) {
-               if (fileOutput) {
-                       return env.readCsvFile(verticesInputPath)
-                                       .lineDelimiter("\n")
-                                       .types(Long.class, Double.class)
-                                       .map(new Tuple2ToVertexMap<Long, 
Double>());
+       @SuppressWarnings("unchecked")
+       private static Graph<Long, Double, Double> 
getGraph(ExecutionEnvironment env) {
+               if(fileOutput) {
+                       return Graph.fromCsvReader(verticesInputPath, 
edgesInputPath, env).lineDelimiterEdges("\n")
+                                       .typesEdges(Long.class, Double.class)
+                                       .typesVertices(Long.class, 
Double.class);
                } else {
                        System.err.println("Usage: IncrementalSSSP <vertex 
path> <edge path> <edges in SSSP> " +
                                        "<src id edge to be removed> <trg id 
edge to be removed> <val edge to be removed> " +
                                        "<output path> <max iterations>");
-                       return IncrementalSSSPData.getDefaultVertexDataSet(env);
+                       return 
Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), 
IncrementalSSSPData.getDefaultEdgeDataSet(env), env);
                }
        }
 
-       private static DataSet<Edge<Long, Double>> 
getEdgesDataSet(ExecutionEnvironment env) {
-               if (fileOutput) {
-                       return env.readCsvFile(edgesInputPath)
-                                       .lineDelimiter("\n")
-                                       .types(Long.class, Long.class, 
Double.class)
-                                       .map(new Tuple3ToEdgeMap<Long, 
Double>());
-               } else {
-                       System.err.println("Usage: IncrementalSSSP <vertex 
path> <edge path> <edges in SSSP> " +
-                                       "<src id edge to be removed> <trg id 
edge to be removed> <val edge to be removed> " +
-                                       "<output path> <max iterations>");
-                       return IncrementalSSSPData.getDefaultEdgeDataSet(env);
-               }
-       }
-
-       private static DataSet<Edge<Long, Double>> 
getEdgesinSSSPDataSet(ExecutionEnvironment env) {
-               if (fileOutput) {
-                       return env.readCsvFile(edgesInSSSPInputPath)
-                                       .lineDelimiter("\n")
-                                       .types(Long.class, Long.class, 
Double.class)
-                                       .map(new Tuple3ToEdgeMap<Long, 
Double>());
+       @SuppressWarnings("unchecked")
+       private static Graph<Long, Double, Double> 
getSSSPGraph(ExecutionEnvironment env) {
+               if(fileOutput) {
+                       return Graph.fromCsvReader(verticesInputPath, 
edgesInSSSPInputPath, env).lineDelimiterEdges("\n")
+                                       .typesEdges(Long.class, Double.class)
+                                       .typesVertices(Long.class, 
Double.class);
                } else {
                        System.err.println("Usage: IncrementalSSSP <vertex 
path> <edge path> <edges in SSSP> " +
                                        "<src id edge to be removed> <trg id 
edge to be removed> <val edge to be removed> " +
                                        "<output path> <max iterations>");
-                       return IncrementalSSSPData.getDefaultEdgesInSSSP(env);
+                       return 
Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), 
IncrementalSSSPData.getDefaultEdgesInSSSP(env), env);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/702277fa/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
index 391ebaf..ef09bff 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
@@ -196,4 +196,4 @@ public class SingleSourceShortestPaths implements 
ProgramDescription {
        public String getDescription() {
                return "Vertex-centric Single Source Shortest Paths";
        }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/702277fa/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
new file mode 100644
index 0000000..2b78d32
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import com.google.common.base.Charsets;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class GraphCreationWithCsvITCase extends MultipleProgramsTestBase {
+
+       public GraphCreationWithCsvITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       private String expectedResult;
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testCreateWithCsvFile() throws Exception {
+               /*
+                * Test with two Csv files one with Vertex Data and one with 
Edges data
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               final String fileContent =  "1,1\n"+
+                               "2,2\n"+
+                               "3,3\n";
+               final FileInputSplit split = createTempFile(fileContent);
+               final String fileContent2 =  "1,2,ot\n"+
+                               "3,2,tt\n"+
+                               "3,1,to\n";
+               final FileInputSplit split2 = createTempFile(fileContent2);
+
+               Graph<Long,Long,String> graph= 
Graph.fromCsvReader(split.getPath().toString(),split2.getPath().toString(),env)
+                               .typesEdges(Long.class, String.class)
+                               .typesVertices(Long.class, Long.class);
+
+               List<Triplet<Long, Long, String>> result = 
graph.getTriplets().collect();
+
+               expectedResult = "1,2,1,2,ot\n" +
+                               "3,2,3,2,tt\n" +
+                               "3,1,3,1,to\n";
+
+               compareResultAsTuples(result, expectedResult);
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testCsvWithNullEdge() throws Exception {
+               /*
+               Test fromCsvReader with edge and vertex path and nullvalue for 
edge
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               final String vertexFileContent = "1,one\n"+
+                               "2,two\n"+
+                               "3,three\n";
+               final String edgeFileContent = "1,2\n"+
+                               "3,2\n"+
+                               "3,1\n";
+               final FileInputSplit split = createTempFile(vertexFileContent);
+               final FileInputSplit edgeSplit = 
createTempFile(edgeFileContent);
+
+               Graph<Long, String, NullValue> graph= 
Graph.fromCsvReader(split.getPath().toString(), edgeSplit.getPath().toString(),
+                               env)
+                               .typesEdges(Long.class)
+                               .typesVerticesNullEdge(Long.class, 
String.class);
+
+               List<Triplet<Long, String, NullValue>> result = 
graph.getTriplets().collect();
+
+               expectedResult = "1,2,one,two,(null)\n"+
+                               "3,2,three,two,(null)\n"+
+                               "3,1,three,one,(null)\n";
+
+               compareResultAsTuples(result, expectedResult);
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testCsvWithConstantValueMapper() throws Exception {
+               /*
+               *Test fromCsvReader with edge path and a mapper that assigns a 
Double constant as value
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               final String fileContent =  "1,2,ot\n"+
+                               "3,2,tt\n"+
+                               "3,1,to\n";
+               final FileInputSplit split = createTempFile(fileContent);
+
+               Graph<Long, Double, String> graph = 
Graph.fromCsvReader(split.getPath().toString(),
+                               new AssignDoubleValueMapper(), 
env).typesEdges(Long.class, String.class)
+                               .typesVertices(Long.class, Double.class);
+               List<Triplet<Long, Double, String>> result = 
graph.getTriplets().collect();
+               //graph.getTriplets().writeAsCsv(resultPath);
+               expectedResult = "1,2,0.1,0.1,ot\n" + "3,1,0.1,0.1,to\n" + 
"3,2,0.1,0.1,tt\n";
+               compareResultAsTuples(result, expectedResult);
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testCreateWithOnlyEdgesCsvFile() throws Exception {
+               /*
+                * Test with one Csv file one with Edges data. Also tests the 
configuration method ignoreFistLineEdges()
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               final String fileContent2 =  "header\n1,2,ot\n"+
+                               "3,2,tt\n"+
+                               "3,1,to\n";
+
+               final FileInputSplit split2 = createTempFile(fileContent2);
+               Graph<Long,NullValue,String> graph= 
Graph.fromCsvReader(split2.getPath().toString(), env)
+                               .ignoreFirstLineEdges()
+                               .ignoreCommentsVertices("hi")
+                               .typesEdges(Long.class, String.class)
+                               .typesVertices(Long.class);
+
+               List<Triplet<Long, NullValue, String>> result = 
graph.getTriplets().collect();
+               expectedResult = "1,2,(null),(null),ot\n" +
+                               "3,2,(null),(null),tt\n" +
+                               "3,1,(null),(null),to\n";
+
+               compareResultAsTuples(result, expectedResult);
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testCreateCsvFileDelimiterConfiguration() throws Exception {
+               /*
+                * Test with an Edge and Vertex csv file. Tests the 
configuration methods FieldDelimiterEdges and
+                * FieldDelimiterVertices
+                * Also tests the configuration methods LineDelimiterEdges and 
LineDelimiterVertices
+                */
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               final String fileContent =  "header\n1;1\n"+
+                               "2;2\n"+
+                               "3;3\n";
+
+               final FileInputSplit split = createTempFile(fileContent);
+
+               final String fileContent2 =  "header|1:2:ot|"+
+                               "3:2:tt|"+
+                               "3:1:to|";
+
+               final FileInputSplit split2 = createTempFile(fileContent2);
+
+               Graph<Long,Long,String> graph= 
Graph.fromCsvReader(split.getPath().toString(),split2.getPath().toString(),env).
+                               
ignoreFirstLineEdges().ignoreFirstLineVertices().
+                               
fieldDelimiterEdges(":").fieldDelimiterVertices(";").
+                               lineDelimiterEdges("|").
+                               typesEdges(Long.class, String.class)
+                               .typesVertices(Long.class, Long.class);
+
+               List<Triplet<Long, Long, String>> result = 
graph.getTriplets().collect();
+
+               expectedResult = "1,2,1,2,ot\n" +
+                               "3,2,3,2,tt\n" +
+                               "3,1,3,1,to\n";
+
+               compareResultAsTuples(result, expectedResult);
+
+       }
+
+       
/*----------------------------------------------------------------------------------------------------------------*/
+       @SuppressWarnings("serial")
+       private static final class AssignDoubleValueMapper implements 
MapFunction<Long, Double> {
+               public Double map(Long value) {
+                       return 0.1d;
+               }
+       }
+
+       private FileInputSplit createTempFile(String content) throws 
IOException {
+               File tempFile = File.createTempFile("test_contents", "tmp");
+               tempFile.deleteOnExit();
+
+               OutputStreamWriter wrt = new OutputStreamWriter(
+                               new FileOutputStream(tempFile), Charsets.UTF_8
+               );
+               wrt.write(content);
+               wrt.close();
+
+               return new FileInputSplit(0, new 
Path(tempFile.toURI().toString()), 0,
+                                                       tempFile.length(), new 
String[] {"localhost"});
+       }
+}

Reply via email to