[hotfix] [gelly] Improve generic type formatting

Add spacing between type parameters.

For example, 'Vertex<K,VV>' has been updated to 'Vertex<K, VV>'.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53716a4d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53716a4d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53716a4d

Branch: refs/heads/master
Commit: 53716a4da1732ee66c7906772bd5bfcd2218b3e1
Parents: b408d61
Author: Greg Hogan <[email protected]>
Authored: Tue Jan 17 14:25:37 2017 -0500
Committer: Greg Hogan <[email protected]>
Committed: Tue Jan 17 15:38:24 2017 -0500

----------------------------------------------------------------------
 .../apache/flink/graph/drivers/Graph500.java    |  2 +-
 .../graph/examples/data/TriangleCountData.java  | 10 ++--
 .../graph/library/TriangleEnumeratorITCase.java |  6 +--
 .../test/examples/IncrementalSSSPITCase.java    |  2 +-
 .../main/java/org/apache/flink/graph/Graph.java | 30 +++++------
 .../asm/translate/TranslateEdgeValues.java      |  2 +-
 .../graph/asm/translate/TranslateGraphIds.java  |  2 +-
 .../graph/generator/AbstractGraphGenerator.java |  2 +-
 .../flink/graph/generator/CompleteGraph.java    | 12 ++---
 .../flink/graph/generator/CycleGraph.java       |  2 +-
 .../flink/graph/generator/EmptyGraph.java       | 10 ++--
 .../flink/graph/generator/GraphGenerator.java   |  4 +-
 .../graph/generator/GraphGeneratorUtils.java    | 18 +++----
 .../apache/flink/graph/generator/GridGraph.java | 20 +++----
 .../apache/flink/graph/generator/PathGraph.java |  2 +-
 .../apache/flink/graph/generator/RMatGraph.java | 14 ++---
 .../graph/generator/SingletonEdgeGraph.java     | 12 ++---
 .../apache/flink/graph/generator/StarGraph.java | 14 ++---
 .../flink/graph/library/TriangleEnumerator.java | 10 ++--
 .../directed/LocalClusteringCoefficient.java    |  2 +-
 .../undirected/LocalClusteringCoefficient.java  |  4 +-
 .../graph/pregel/VertexCentricIteration.java    |  2 +-
 .../graph/spargel/ScatterGatherIteration.java   |  4 +-
 .../proxy/GraphAlgorithmWrappingDataSet.java    |  2 +-
 .../org/apache/flink/graph/asm/AsmTestBase.java | 16 +++---
 .../graph/asm/simple/directed/SimplifyTest.java |  4 +-
 .../asm/simple/undirected/SimplifyTest.java     |  6 +--
 .../graph/asm/translate/TranslateTest.java      |  2 +-
 .../graph/generator/CompleteGraphTest.java      | 10 ++--
 .../flink/graph/generator/CycleGraphTest.java   | 10 ++--
 .../flink/graph/generator/EmptyGraphTest.java   | 10 ++--
 .../flink/graph/generator/GridGraphTest.java    |  6 +--
 .../graph/generator/HypercubeGraphTest.java     |  6 +--
 .../flink/graph/generator/PathGraphTest.java    | 10 ++--
 .../flink/graph/generator/RMatGraphTest.java    |  8 +--
 .../graph/generator/SingletonEdgeGraphTest.java | 10 ++--
 .../flink/graph/generator/StarGraphTest.java    | 10 ++--
 .../apache/flink/graph/generator/TestUtils.java |  6 +--
 .../graph/library/link_analysis/HITSTest.java   |  4 +-
 .../flink/graph/pregel/PregelCompilerTest.java  | 24 ++++-----
 .../graph/pregel/PregelTranslationTest.java     | 16 +++---
 .../graph/spargel/SpargelCompilerTest.java      | 24 ++++-----
 .../graph/spargel/SpargelTranslationTest.java   |  4 +-
 .../test/ScatterGatherConfigurationITCase.java  |  6 +--
 .../apache/flink/graph/test/TestGraphUtils.java | 20 +++----
 .../test/operations/FromCollectionITCase.java   | 13 ++---
 .../test/operations/GraphCreationITCase.java    | 18 +++----
 .../operations/GraphCreationWithCsvITCase.java  | 16 +++---
 .../GraphCreationWithMapperITCase.java          | 16 +++---
 .../test/operations/GraphMutationsITCase.java   | 56 ++++++++++----------
 .../test/operations/GraphOperationsITCase.java  | 20 +++----
 .../test/operations/JoinWithEdgesITCase.java    | 34 ++++++------
 .../test/operations/JoinWithVerticesITCase.java | 24 ++++-----
 .../operations/ReduceOnEdgesMethodsITCase.java  | 36 ++++++-------
 .../ReduceOnNeighborMethodsITCase.java          | 30 +++++------
 55 files changed, 332 insertions(+), 331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
index 51ef66f..3509d2e 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
@@ -103,7 +103,7 @@ public class Graph500 {
                        graph = graph.run(new Simplify<LongValue, NullValue, 
NullValue>(clipAndFlip));
                }
 
-               DataSet<Tuple2<LongValue,LongValue>> edges = graph
+               DataSet<Tuple2<LongValue, LongValue>> edges = graph
                        .getEdges()
                        .project(0, 1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
index a14010f..cf3a715 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
@@ -53,11 +53,11 @@ public class TriangleCountData {
 
        public static final String RESULTED_NUMBER_OF_TRIANGLES = "3";
 
-       public static List<Tuple3<Long,Long,Long>> getListOfTriangles() {
-               ArrayList<Tuple3<Long,Long,Long>> ret = new ArrayList<>(3);
-               ret.add(new Tuple3<>(1L,2L,3L));
-               ret.add(new Tuple3<>(2L,3L,6L));
-               ret.add(new Tuple3<>(4L,3L,5L));
+       public static List<Tuple3<Long, Long, Long>> getListOfTriangles()       
{
+               ArrayList<Tuple3<Long, Long, Long>> ret = new ArrayList<>(3);
+               ret.add(new Tuple3<>(1L, 2L, 3L));
+               ret.add(new Tuple3<>(2L, 3L, 6L));
+               ret.add(new Tuple3<>(4L, 3L, 5L));
                return ret;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
index 9550405..176a7e1 100644
--- 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
@@ -46,11 +46,11 @@ public class TriangleEnumeratorITCase extends 
MultipleProgramsTestBase {
                Graph<Long, NullValue, NullValue> graph = 
Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
                                env);
 
-               List<Tuple3<Long,Long,Long>> actualOutput = graph.run(new 
TriangleEnumerator<Long, NullValue, NullValue>()).collect();
-               List<Tuple3<Long,Long,Long>> expectedResult = 
TriangleCountData.getListOfTriangles();
+               List<Tuple3<Long, Long, Long>> actualOutput = graph.run(new 
TriangleEnumerator<Long, NullValue, NullValue>()).collect();
+               List<Tuple3<Long, Long, Long>> expectedResult = 
TriangleCountData.getListOfTriangles();
 
                Assert.assertEquals(expectedResult.size(), actualOutput.size());
-               for(Tuple3<Long,Long,Long> resultTriangle:actualOutput) {
+               for(Tuple3<Long, Long, Long> resultTriangle:actualOutput)       
{
                        
Assert.assertTrue(expectedResult.indexOf(resultTriangle)>=0);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
index 24b8cf1..de92666 100644
--- 
a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
+++ 
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
@@ -82,7 +82,7 @@ public class IncrementalSSSPITCase extends 
MultipleProgramsTestBase {
        public void testIncrementalSSSP() throws Exception {
                IncrementalSSSP.main(new String[]{verticesPath, edgesPath, 
edgesInSSSPPath,
                                IncrementalSSSPData.SRC_EDGE_TO_BE_REMOVED, 
IncrementalSSSPData.TRG_EDGE_TO_BE_REMOVED,
-                               
IncrementalSSSPData.VAL_EDGE_TO_BE_REMOVED,resultPath, 
IncrementalSSSPData.NUM_VERTICES + ""});
+                               IncrementalSSSPData.VAL_EDGE_TO_BE_REMOVED, 
resultPath, IncrementalSSSPData.NUM_VERTICES + ""});
                expected = IncrementalSSSPData.RESULTED_VERTICES;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 093b804..dae7a11 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -544,7 +544,7 @@ public class Graph<K, VV, EV> {
         * @param returnType the explicit return type.
         * @return a new graph
         */
-       public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, 
VV>, NV> mapper, TypeInformation<Vertex<K,NV>> returnType) {
+       public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, 
VV>, NV> mapper, TypeInformation<Vertex<K, NV>> returnType) {
                DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
                                new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() 
{
                                        private Vertex<K, NV> output = new 
Vertex<>();
@@ -588,7 +588,7 @@ public class Graph<K, VV, EV> {
         * @param returnType the explicit return type.
         * @return a new graph
         */
-       public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, 
NV> mapper, TypeInformation<Edge<K,NV>> returnType) {
+       public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, 
NV> mapper, TypeInformation<Edge<K, NV>> returnType) {
                DataSet<Edge<K, NV>> mappedEdges = edges.map(
                        new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
                                private Edge<K, NV> output = new Edge<>();
@@ -924,7 +924,7 @@ public class Graph<K, VV, EV> {
                private Tuple2<K, LongValue> vertexDegree = new Tuple2<>(null, 
degree);
 
                @SuppressWarnings("unused")
-               public void coGroup(Iterable<Vertex<K, VV>> vertex,     
Iterable<Edge<K, EV>> outEdges,
+               public void coGroup(Iterable<Vertex<K, VV>> vertex, 
Iterable<Edge<K, EV>> outEdges,
                                Collector<Tuple2<K, LongValue>> out) {
                        long count = 0;
                        for (Edge<K, EV> edge : outEdges) {
@@ -1217,7 +1217,7 @@ public class Graph<K, VV, EV> {
                        this.function = fun;
                }
 
-               public void coGroup(Iterable<Vertex<K, VV>> vertex,     final 
Iterable<Tuple2<K, Edge<K, EV>>> keysWithEdges,
+               public void coGroup(Iterable<Vertex<K, VV>> vertex,     final 
Iterable<Tuple2<K, Edge<K, EV>>> keysWithEdges,
                                Collector<T> out) throws Exception {
 
                        final Iterator<Edge<K, EV>> edgesIterator = new 
Iterator<Edge<K, EV>>() {
@@ -1416,9 +1416,9 @@ public class Graph<K, VV, EV> {
         */
        public Graph<K, VV, EV> addEdges(List<Edge<K, EV>> newEdges) {
 
-               DataSet<Edge<K,EV>> newEdgesDataSet = 
this.context.fromCollection(newEdges);
+               DataSet<Edge<K, EV>> newEdgesDataSet = 
this.context.fromCollection(newEdges);
 
-               DataSet<Edge<K,EV>> validNewEdges = 
this.getVertices().join(newEdgesDataSet)
+               DataSet<Edge<K, EV>> validNewEdges = 
this.getVertices().join(newEdgesDataSet)
                                .where(0).equalTo(0)
                                .with(new JoinVerticesWithEdgesOnSrc<K, VV, 
EV>()).name("Join with source")
                                .join(this.getVertices()).where(1).equalTo(0)
@@ -1516,7 +1516,7 @@ public class Graph<K, VV, EV> {
        }
 
        @ForwardedFieldsSecond("f0; f1; f2")
-       private static final class ProjectEdgeToBeRemoved<K,VV,EV> implements 
JoinFunction<Vertex<K, VV>, Edge<K, EV>, Edge<K, EV>> {
+       private static final class ProjectEdgeToBeRemoved<K, VV, EV> implements 
JoinFunction<Vertex<K, VV>, Edge<K, EV>, Edge<K, EV>> {
                @Override
                public Edge<K, EV> join(Vertex<K, VV> vertex, Edge<K, EV> edge) 
throws Exception {
                        return edge;
@@ -1559,12 +1559,12 @@ public class Graph<K, VV, EV> {
        public Graph<K, VV, EV> removeEdges(List<Edge<K, EV>> edgesToBeRemoved) 
{
 
                DataSet<Edge<K, EV>> newEdges = 
getEdges().coGroup(this.context.fromCollection(edgesToBeRemoved))
-                               .where(0,1).equalTo(0,1).with(new 
EdgeRemovalCoGroup<K, EV>()).name("Remove edges");
+                               .where(0, 1).equalTo(0, 1).with(new 
EdgeRemovalCoGroup<K, EV>()).name("Remove edges");
 
                return new Graph<>(this.vertices, newEdges, context);
        }
 
-       private static final class EdgeRemovalCoGroup<K,EV> implements 
CoGroupFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>> {
+       private static final class EdgeRemovalCoGroup<K, EV> implements 
CoGroupFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>> {
 
                @Override
                public void coGroup(Iterable<Edge<K, EV>> edge, 
Iterable<Edge<K, EV>> edgeToBeRemoved,
@@ -1608,8 +1608,8 @@ public class Graph<K, VV, EV> {
         * @param graph the graph to perform difference with
         * @return a new graph where the common vertices and edges have been 
removed
         */
-       public Graph<K,VV,EV> difference(Graph<K,VV,EV> graph) {
-               DataSet<Vertex<K,VV>> removeVerticesData = graph.getVertices();
+       public Graph<K, VV, EV> difference(Graph<K, VV, EV> graph) {
+               DataSet<Vertex<K, VV>> removeVerticesData = graph.getVertices();
                return this.removeVertices(removeVerticesData);
        }
 
@@ -1688,7 +1688,7 @@ public class Graph<K, VV, EV> {
         * @param <EV>  edge value type
         */
        private static final class MatchingEdgeReducer<K, EV>
-                       implements CoGroupFunction<Edge<K,EV>, Edge<K,EV>, 
Edge<K, EV>> {
+                       implements CoGroupFunction<Edge<K, EV>, Edge<K, EV>, 
Edge<K, EV>> {
 
                @Override
                public void coGroup(Iterable<Edge<K, EV>> edgesLeft, 
Iterable<Edge<K, EV>> edgesRight, Collector<Edge<K, EV>> out)
@@ -2149,12 +2149,12 @@ public class Graph<K, VV, EV> {
 
                public void coGroup(Iterable<Vertex<K, VV>> vertex, 
Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors,
                                Collector<T> out) throws Exception {
-                       function.iterateNeighbors(vertex.iterator().next(),     
neighbors, out);
+                       function.iterateNeighbors(vertex.iterator().next(),     
neighbors, out);
                }
 
                @Override
                public TypeInformation<T> getProducedType() {
-                       return 
TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class,     
function.getClass(), 3, null, null);
+                       return 
TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class,     
function.getClass(), 3, null, null);
                }
        }
 
@@ -2209,7 +2209,7 @@ public class Graph<K, VV, EV> {
 
                @Override
                public TypeInformation<T> getProducedType() {
-                       return 
TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class,     
function.getClass(), 3, null, null);
+                       return 
TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, 
function.getClass(), 3, null, null);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
index b2b7594..c8000e4 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
@@ -39,7 +39,7 @@ public class TranslateEdgeValues<K, VV, OLD, NEW>
 extends GraphAlgorithmWrappingGraph<K, VV, OLD, K, VV, NEW> {
 
        // Required configuration
-       private TranslateFunction<OLD,NEW> translator;
+       private TranslateFunction<OLD, NEW> translator;
 
        // Optional configuration
        private int parallelism = PARALLELISM_DEFAULT;

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
index e079a41..58cb7e2 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
@@ -41,7 +41,7 @@ public class TranslateGraphIds<OLD, NEW, VV, EV>
 extends GraphAlgorithmWrappingGraph<OLD, VV, EV, NEW, VV, EV> {
 
        // Required configuration
-       private TranslateFunction<OLD,NEW> translator;
+       private TranslateFunction<OLD, NEW> translator;
 
        // Optional configuration
        private int parallelism = PARALLELISM_DEFAULT;

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
index 7f37356..58266a5 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/AbstractGraphGenerator.java
@@ -27,7 +27,7 @@ implements GraphGenerator<K, VV, EV> {
        protected int parallelism = PARALLELISM_DEFAULT;
 
        @Override
-       public GraphGenerator<K,VV,EV> setParallelism(int parallelism) {
+       public GraphGenerator<K, VV, EV> setParallelism(int parallelism) {
                this.parallelism = parallelism;
 
                return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
index 5339b14..a4996ab 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
@@ -58,14 +58,14 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
        }
 
        @Override
-       public Graph<LongValue,NullValue,NullValue> generate() {
+       public Graph<LongValue, NullValue, NullValue> generate() {
                // Vertices
-               DataSet<Vertex<LongValue,NullValue>> vertices = 
GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+               DataSet<Vertex<LongValue, NullValue>> vertices = 
GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
 
                // Edges
                LongValueSequenceIterator iterator = new 
LongValueSequenceIterator(0, this.vertexCount - 1);
 
-               DataSet<Edge<LongValue,NullValue>> edges = env
+               DataSet<Edge<LongValue, NullValue>> edges = env
                        .fromParallelCollection(iterator, LongValue.class)
                                .setParallelism(parallelism)
                                .name("Edge iterators")
@@ -79,20 +79,20 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
 
        @ForwardedFields("*->f0")
        public class LinkVertexToAll
-       implements FlatMapFunction<LongValue, Edge<LongValue,NullValue>> {
+       implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
 
                private final long vertexCount;
 
                private LongValue target = new LongValue();
 
-               private Edge<LongValue,NullValue> edge = new Edge<>(null, 
target, NullValue.getInstance());
+               private Edge<LongValue, NullValue> edge = new Edge<>(null, 
target, NullValue.getInstance());
 
                public LinkVertexToAll(long vertex_count) {
                        this.vertexCount = vertex_count;
                }
 
                @Override
-               public void flatMap(LongValue source, 
Collector<Edge<LongValue,NullValue>> out)
+               public void flatMap(LongValue source, Collector<Edge<LongValue, 
NullValue>> out)
                                throws Exception {
                        edge.f0 = source;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
index 2671efe..ce8b467 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
@@ -51,7 +51,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
        }
 
        @Override
-       public Graph<LongValue,NullValue,NullValue> generate() {
+       public Graph<LongValue, NullValue, NullValue> generate() {
                return new GridGraph(env)
                        .addDimension(vertexCount, true)
                        .setParallelism(parallelism)

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
index 05bfd89..7ec368b 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
@@ -60,16 +60,16 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
        }
 
        @Override
-       public Graph<LongValue,NullValue,NullValue> generate() {
+       public Graph<LongValue, NullValue, NullValue> generate() {
                // Vertices
-               DataSet<Vertex<LongValue,NullValue>> vertices = 
GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+               DataSet<Vertex<LongValue, NullValue>> vertices = 
GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
 
                // Edges
-               TypeInformation<Edge<LongValue,NullValue>> typeInformation = 
new TupleTypeInfo<>(
+               TypeInformation<Edge<LongValue, NullValue>> typeInformation = 
new TupleTypeInfo<>(
                        ValueTypeInfo.LONG_VALUE_TYPE_INFO, 
ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.NULL_VALUE_TYPE_INFO);
 
-               DataSource<Edge<LongValue,NullValue>> edges = env
-                       
.fromCollection(Collections.<Edge<LongValue,NullValue>>emptyList(), 
typeInformation)
+               DataSource<Edge<LongValue, NullValue>> edges = env
+                       .fromCollection(Collections.<Edge<LongValue 
,NullValue>>emptyList(), typeInformation)
                                .setParallelism(parallelism)
                                .name("Empty edge set");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java
index 8fece81..f972d98 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGenerator.java
@@ -39,7 +39,7 @@ public interface GraphGenerator<K, VV, EV> {
         *
         * @return generated graph
         */
-       Graph<K,VV,EV> generate();
+       Graph<K, VV, EV> generate();
 
        /**
         * Override the operator parallelism.
@@ -47,5 +47,5 @@ public interface GraphGenerator<K, VV, EV> {
         * @param parallelism operator parallelism
         * @return this
         */
-       GraphGenerator<K,VV,EV> setParallelism(int parallelism);
+       GraphGenerator<K, VV, EV> setParallelism(int parallelism);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
index 01cb2d1..485394c 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
@@ -42,7 +42,7 @@ public class GraphGeneratorUtils {
         * @param vertexCount number of sequential vertex labels
         * @return {@link DataSet} of sequentially labeled {@link Vertex 
Vertices}
         */
-       public static DataSet<Vertex<LongValue,NullValue>> 
vertexSequence(ExecutionEnvironment env, int parallelism, long vertexCount) {
+       public static DataSet<Vertex<LongValue, NullValue>> 
vertexSequence(ExecutionEnvironment env, int parallelism, long vertexCount) {
                LongValueSequenceIterator iterator = new 
LongValueSequenceIterator(0, vertexCount-1);
 
                DataSource<LongValue> vertexLabels = env
@@ -58,9 +58,9 @@ public class GraphGeneratorUtils {
 
        @ForwardedFields("*->f0")
        private static class CreateVertex
-       implements MapFunction<LongValue, Vertex<LongValue,NullValue>> {
+       implements MapFunction<LongValue, Vertex<LongValue, NullValue>> {
 
-               private Vertex<LongValue,NullValue> vertex = new Vertex<>(null, 
NullValue.getInstance());
+               private Vertex<LongValue, NullValue> vertex = new 
Vertex<>(null, NullValue.getInstance());
 
                @Override
                public Vertex<LongValue, NullValue> map(LongValue value)
@@ -84,8 +84,8 @@ public class GraphGeneratorUtils {
         *
         * @see Graph#fromDataSet(DataSet, DataSet, ExecutionEnvironment)
         */
-       public static <K,EV> DataSet<Vertex<K,NullValue>> 
vertexSet(DataSet<Edge<K,EV>> edges, int parallelism) {
-               DataSet<Vertex<K,NullValue>> vertexSet = edges
+       public static <K, EV> DataSet<Vertex<K, NullValue>> 
vertexSet(DataSet<Edge<K, EV>> edges, int parallelism) {
+               DataSet<Vertex<K, NullValue>> vertexSet = edges
                        .flatMap(new EmitSrcAndTarget<K, EV>())
                                .setParallelism(parallelism)
                                .name("Emit source and target labels");
@@ -99,13 +99,13 @@ public class GraphGeneratorUtils {
        /**
         * @see Graph.EmitSrcAndTarget
         */
-       private static final class EmitSrcAndTarget<K,EV>
-       implements FlatMapFunction<Edge<K,EV>, Vertex<K,NullValue>> {
+       private static final class EmitSrcAndTarget<K, EV>
+       implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> {
 
-               private Vertex<K,NullValue> output = new Vertex<>(null, new 
NullValue());
+               private Vertex<K, NullValue> output = new Vertex<>(null, new 
NullValue());
 
                @Override
-               public void flatMap(Edge<K,EV> value, 
Collector<Vertex<K,NullValue>> out) throws Exception {
+               public void flatMap(Edge<K, EV> value, Collector<Vertex<K, 
NullValue>> out) throws Exception {
                        output.f0 = value.f0;
                        out.collect(output);
                        output.f0 = value.f1;

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
index 399a2f9..74ea764 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
@@ -44,7 +44,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
        private final ExecutionEnvironment env;
 
        // Required configuration
-       private List<Tuple2<Long,Boolean>> dimensions = new ArrayList<>();
+       private List<Tuple2<Long, Boolean>> dimensions = new ArrayList<>();
 
        private long vertexCount = 1;
 
@@ -84,18 +84,18 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
        }
 
        @Override
-       public Graph<LongValue,NullValue,NullValue> generate() {
+       public Graph<LongValue, NullValue, NullValue> generate() {
                if (dimensions.isEmpty()) {
                        throw new RuntimeException("No dimensions added to 
GridGraph");
                }
 
                // Vertices
-               DataSet<Vertex<LongValue,NullValue>> vertices = 
GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+               DataSet<Vertex<LongValue, NullValue>> vertices = 
GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
 
                // Edges
                LongValueSequenceIterator iterator = new 
LongValueSequenceIterator(0, this.vertexCount - 1);
 
-               DataSet<Edge<LongValue,NullValue>> edges = env
+               DataSet<Edge<LongValue, NullValue>> edges = env
                        .fromParallelCollection(iterator, LongValue.class)
                                .setParallelism(parallelism)
                                .name("Edge iterators")
@@ -109,23 +109,23 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
 
        @ForwardedFields("*->f0")
        public class LinkVertexToNeighbors
-       implements FlatMapFunction<LongValue, Edge<LongValue,NullValue>> {
+       implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
 
                private long vertexCount;
 
-               private List<Tuple2<Long,Boolean>> dimensions;
+               private List<Tuple2<Long, Boolean>> dimensions;
 
                private LongValue target = new LongValue();
 
-               private Edge<LongValue,NullValue> edge = new Edge<>(null, 
target, NullValue.getInstance());
+               private Edge<LongValue, NullValue> edge = new Edge<>(null, 
target, NullValue.getInstance());
 
-               public LinkVertexToNeighbors(long vertexCount, 
List<Tuple2<Long,Boolean>> dimensions) {
+               public LinkVertexToNeighbors(long vertexCount, 
List<Tuple2<Long, Boolean>> dimensions) {
                        this.vertexCount = vertexCount;
                        this.dimensions = dimensions;
                }
 
                @Override
-               public void flatMap(LongValue source, 
Collector<Edge<LongValue,NullValue>> out)
+               public void flatMap(LongValue source, Collector<Edge<LongValue, 
NullValue>> out)
                                throws Exception {
                        edge.f0 = source;
                        long val = source.getValue();
@@ -136,7 +136,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
                        // the value in the remaining dimensions
                        long remainder = val;
 
-                       for (Tuple2<Long,Boolean> dimension : dimensions) {
+                       for (Tuple2<Long, Boolean> dimension : dimensions) {
                                increment /= dimension.f0;
 
                                // the index within this dimension

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
index 1aec723..db5e6bf 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
@@ -51,7 +51,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
        }
 
        @Override
-       public Graph<LongValue,NullValue,NullValue> generate() {
+       public Graph<LongValue, NullValue, NullValue> generate() {
                return new GridGraph(env)
                        .addDimension(vertexCount, false)
                        .setParallelism(parallelism)

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
index 8a17b13..2a80a37 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
@@ -139,7 +139,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
        }
 
        @Override
-       public Graph<LongValue,NullValue,NullValue> generate() {
+       public Graph<LongValue, NullValue, NullValue> generate() {
                int scale = Long.SIZE - Long.numberOfLeadingZeros(vertexCount - 
1);
 
                // Edges
@@ -148,7 +148,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
                List<BlockInfo<T>> generatorBlocks = randomGenerableFactory
                        .getRandomGenerables(edgeCount, cyclesPerEdge);
 
-               DataSet<Edge<LongValue,NullValue>> edges = env
+               DataSet<Edge<LongValue, NullValue>> edges = env
                        .fromCollection(generatorBlocks)
                                .name("Random generators")
                        .rebalance()
@@ -159,14 +159,14 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
                                .name("RMat graph edges");
 
                // Vertices
-               DataSet<Vertex<LongValue,NullValue>> vertices = 
GraphGeneratorUtils.vertexSet(edges, parallelism);
+               DataSet<Vertex<LongValue, NullValue>> vertices = 
GraphGeneratorUtils.vertexSet(edges, parallelism);
 
                // Graph
                return Graph.fromDataSet(vertices, edges, env);
        }
 
        private static final class GenerateEdges<T extends RandomGenerator>
-       implements FlatMapFunction<BlockInfo<T>, Edge<LongValue,NullValue>> {
+       implements FlatMapFunction<BlockInfo<T>, Edge<LongValue, NullValue>> {
 
                // Configuration
                private final long vertexCount;
@@ -190,9 +190,9 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
 
                private LongValue target = new LongValue();
 
-               private Edge<LongValue,NullValue> sourceToTarget = new 
Edge<>(source, target, NullValue.getInstance());
+               private Edge<LongValue, NullValue> sourceToTarget = new 
Edge<>(source, target, NullValue.getInstance());
 
-               private Edge<LongValue,NullValue> targetToSource = new 
Edge<>(target, source, NullValue.getInstance());
+               private Edge<LongValue, NullValue> targetToSource = new 
Edge<>(target, source, NullValue.getInstance());
 
                public GenerateEdges(long vertexCount, int scale, float A, 
float B, float C, boolean noiseEnabled, float noise) {
                        this.vertexCount = vertexCount;
@@ -206,7 +206,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
                }
 
                @Override
-               public void flatMap(BlockInfo<T> blockInfo, 
Collector<Edge<LongValue,NullValue>> out)
+               public void flatMap(BlockInfo<T> blockInfo, 
Collector<Edge<LongValue, NullValue>> out)
                                throws Exception {
                        RandomGenerator rng = 
blockInfo.getRandomGenerable().generator();
                        long edgesToGenerate = blockInfo.getElementCount();

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
index e35b517..2eef7ae 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
@@ -58,16 +58,16 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
        }
 
        @Override
-       public Graph<LongValue,NullValue,NullValue> generate() {
+       public Graph<LongValue, NullValue, NullValue> generate() {
                // Vertices
                long vertexCount = 2 * this.vertexPairCount;
 
-               DataSet<Vertex<LongValue,NullValue>> vertices = 
GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+               DataSet<Vertex<LongValue, NullValue>> vertices = 
GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
 
                // Edges
                LongValueSequenceIterator iterator = new 
LongValueSequenceIterator(0, vertexCount - 1);
 
-               DataSet<Edge<LongValue,NullValue>> edges = env
+               DataSet<Edge<LongValue, NullValue>> edges = env
                        .fromParallelCollection(iterator, LongValue.class)
                                .setParallelism(parallelism)
                                .name("Edge iterators")
@@ -81,16 +81,16 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
 
        @ForwardedFields("*->f0")
        private static class LinkVertexToSingletonNeighbor
-       implements MapFunction<LongValue, Edge<LongValue,NullValue>> {
+       implements MapFunction<LongValue, Edge<LongValue, NullValue>> {
 
                private LongValue source = new LongValue();
 
                private LongValue target = new LongValue();
 
-               private Edge<LongValue,NullValue> edge = new Edge<>(source, 
target, NullValue.getInstance());
+               private Edge<LongValue, NullValue> edge = new Edge<>(source, 
target, NullValue.getInstance());
 
                @Override
-               public Edge<LongValue,NullValue> map(LongValue value) throws 
Exception {
+               public Edge<LongValue, NullValue> map(LongValue value) throws 
Exception {
                        long val = value.getValue();
 
                        source.setValue(val);

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
index cb99f30..a47ae4d 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
@@ -58,14 +58,14 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
        }
 
        @Override
-       public Graph<LongValue,NullValue,NullValue> generate() {
+       public Graph<LongValue, NullValue, NullValue> generate() {
                // Vertices
-               DataSet<Vertex<LongValue,NullValue>> vertices = 
GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
+               DataSet<Vertex<LongValue, NullValue>> vertices = 
GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
 
                // Edges
                LongValueSequenceIterator iterator = new 
LongValueSequenceIterator(1, this.vertexCount - 1);
 
-               DataSet<Edge<LongValue,NullValue>> edges = env
+               DataSet<Edge<LongValue, NullValue>> edges = env
                        .fromParallelCollection(iterator, LongValue.class)
                                .setParallelism(parallelism)
                                .name("Edge iterators")
@@ -79,16 +79,16 @@ extends AbstractGraphGenerator<LongValue, NullValue, 
NullValue> {
 
        @ForwardedFields("*->f0")
        public class LinkVertexToCenter
-       implements FlatMapFunction<LongValue, Edge<LongValue,NullValue>> {
+       implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
 
                private LongValue center = new LongValue(0);
 
-               private Edge<LongValue,NullValue> center_to_leaf = new 
Edge<>(center, null, NullValue.getInstance());
+               private Edge<LongValue, NullValue> center_to_leaf = new 
Edge<>(center, null, NullValue.getInstance());
 
-               private Edge<LongValue,NullValue> leaf_to_center = new 
Edge<>(null, center, NullValue.getInstance());
+               private Edge<LongValue, NullValue> leaf_to_center = new 
Edge<>(null, center, NullValue.getInstance());
 
                @Override
-               public void flatMap(LongValue leaf, 
Collector<Edge<LongValue,NullValue>> out)
+               public void flatMap(LongValue leaf, Collector<Edge<LongValue, 
NullValue>> out)
                                throws Exception {
                        center_to_leaf.f1 = leaf;
                        out.collect(center_to_leaf);

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
index dabeb06..6296618 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -60,10 +60,10 @@ import java.util.List;
  * grouping on edges on the vertex with the smaller degree.
  */
 public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
-       GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K,K,K>>> {
+       GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
 
        @Override
-       public DataSet<Tuple3<K,K,K>> run(Graph<K, VV, EV> input) throws 
Exception {
+       public DataSet<Tuple3<K, K, K>> run(Graph<K, VV, EV> input) throws 
Exception {
 
                DataSet<Edge<K, EV>> edges = input.getEdges();
 
@@ -77,7 +77,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, 
EV> implements
                // project edges by vertex id
                DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new 
EdgeByIdProjector<K>());
 
-               DataSet<Tuple3<K,K,K>> triangles = edgesByDegree
+               DataSet<Tuple3<K, K, K>> triangles = edgesByDegree
                                // build triads
                                
.groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
                                .reduceGroup(new TriadBuilder<K>())
@@ -268,10 +268,10 @@ public class TriangleEnumerator<K extends Comparable<K>, 
VV, EV> implements
         * Filters triads (three vertices connected by two edges) without a 
closing third edge.
         */
        @SuppressWarnings("serial")
-       private static final class TriadFilter<K> implements 
JoinFunction<Triad<K>, Edge<K,NullValue>, Tuple3<K,K,K>> {
+       private static final class TriadFilter<K> implements 
JoinFunction<Triad<K>, Edge<K, NullValue>, Tuple3<K, K, K>> {
 
                @Override
-               public Tuple3<K,K,K> join(Triad<K> triad, Edge<K, NullValue> 
edge) throws Exception {
+               public Tuple3<K, K, K> join(Triad<K> triad, Edge<K, NullValue> 
edge) throws Exception {
                        return new Tuple3<>(triad.getFirstVertex(), 
triad.getSecondVertex(), triad.getThirdVertex());
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index 93fb678..cad36b3 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -139,7 +139,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
                        throws Exception {
                // u, v, w, bitmask
                DataSet<TriangleListing.Result<K>> triangles = input
-                       .run(new TriangleListing<K,VV,EV>()
+                       .run(new TriangleListing<K, VV, EV>()
                                .setLittleParallelism(littleParallelism));
 
                // u, edge count

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index b22a0ce..99e3db9 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -138,8 +138,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> 
{
        public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
                        throws Exception {
                // u, v, w
-               DataSet<Tuple3<K,K,K>> triangles = input
-                       .run(new TriangleListing<K,VV,EV>()
+               DataSet<Tuple3<K, K, K>> triangles = input
+                       .run(new TriangleListing<K, VV, EV>()
                                .setLittleParallelism(littleParallelism));
 
                // u, 1

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
index 5b2502e..7e8ebd7 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
@@ -170,7 +170,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
                DataSet<Tuple2<K, Either<NullValue, Message>>> initialWorkSet = 
initialVertices.map(
                                new InitializeWorkSet<K, VV, 
Message>()).returns(workSetTypeInfo);
 
-               final DeltaIteration<Vertex<K, VV>,     Tuple2<K, 
Either<NullValue, Message>>> iteration =
+               final DeltaIteration<Vertex<K, VV>, Tuple2<K, Either<NullValue, 
Message>>> iteration =
                                initialVertices.iterateDelta(initialWorkSet, 
this.maximumNumberOfIterations, 0);
                setUpIteration(iteration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
index a378ab1..9f5585d 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
@@ -569,7 +569,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
                TypeInformation<Vertex<K, VV>> vertexTypes = 
initialVertices.getType();
 
-               final DeltaIteration<Vertex<K, VV>,     Vertex<K, VV>> 
iteration =
+               final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
                                initialVertices.iterateDelta(initialVertices, 
this.maximumNumberOfIterations, 0);
                                setUpIteration(iteration);
 
@@ -635,7 +635,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
                DataSet<Vertex<K, Tuple3<VV, LongValue, LongValue>>> 
verticesWithDegrees = initialVertices
                                .join(degrees).where(0).equalTo(0)
-                               .with(new FlatJoinFunction<Vertex<K,VV>, 
Tuple3<K, LongValue, LongValue>, Vertex<K, Tuple3<VV, LongValue, LongValue>>>() 
{
+                               .with(new FlatJoinFunction<Vertex<K, VV>, 
Tuple3<K, LongValue, LongValue>, Vertex<K, Tuple3<VV, LongValue, LongValue>>>() 
{
                                        @Override
                                        public void join(Vertex<K, VV> vertex, 
Tuple3<K, LongValue, LongValue> degrees,
                                                                        
Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
index 7a4a0e6..11e7a64 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
@@ -52,7 +52,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<T>> {
        private static Map<GraphAlgorithmWrappingDataSet, 
List<GraphAlgorithmWrappingDataSet>> cache =
                Collections.synchronizedMap(new 
HashMap<GraphAlgorithmWrappingDataSet, List<GraphAlgorithmWrappingDataSet>>());
 
-       private Graph<K,VV,EV> input;
+       private Graph<K, VV, EV> input;
 
        private NoOpOperator<T> wrappingOperator;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index 8ef87a5..14c2da4 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -38,24 +38,24 @@ public class AsmTestBase {
        protected ExecutionEnvironment env;
 
        // simple graph
-       protected Graph<IntValue,NullValue,NullValue> directedSimpleGraph;
+       protected Graph<IntValue, NullValue, NullValue> directedSimpleGraph;
 
-       protected Graph<IntValue,NullValue,NullValue> undirectedSimpleGraph;
+       protected Graph<IntValue, NullValue, NullValue> undirectedSimpleGraph;
 
        // complete graph
        protected final long completeGraphVertexCount = 47;
 
-       protected Graph<LongValue,NullValue,NullValue> completeGraph;
+       protected Graph<LongValue, NullValue, NullValue> completeGraph;
 
        // empty graph
        protected final long emptyGraphVertexCount = 3;
 
-       protected Graph<LongValue,NullValue,NullValue> emptyGraph;
+       protected Graph<LongValue, NullValue, NullValue> emptyGraph;
 
        // RMat graph
-       protected Graph<LongValue,NullValue,NullValue> directedRMatGraph;
+       protected Graph<LongValue, NullValue, NullValue> directedRMatGraph;
 
-       protected Graph<LongValue,NullValue,NullValue> undirectedRMatGraph;
+       protected Graph<LongValue, NullValue, NullValue> undirectedRMatGraph;
 
        @Before
        public void setup()
@@ -73,7 +73,7 @@ public class AsmTestBase {
                        new Object[]{5, 3},
                };
 
-               List<Edge<IntValue,NullValue>> directedEdgeList = new 
LinkedList<>();
+               List<Edge<IntValue, NullValue>> directedEdgeList = new 
LinkedList<>();
 
                for (Object[] edge : edges) {
                        directedEdgeList.add(new Edge<>(new IntValue((int) 
edge[0]), new IntValue((int) edge[1]), NullValue.getInstance()));
@@ -95,7 +95,7 @@ public class AsmTestBase {
                long rmatVertexCount = 1L << 10;
                long rmatEdgeCount = 16 * rmatVertexCount;
 
-               Graph<LongValue,NullValue,NullValue> rmatGraph = new 
RMatGraph<>(env, new JDKRandomGeneratorFactory(), rmatVertexCount, 
rmatEdgeCount)
+               Graph<LongValue, NullValue, NullValue> rmatGraph = new 
RMatGraph<>(env, new JDKRandomGeneratorFactory(), rmatVertexCount, 
rmatEdgeCount)
                        .generate();
 
                directedRMatGraph = rmatGraph

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
index a4b3946..709317c 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
@@ -32,7 +32,7 @@ import java.util.List;
 
 public class SimplifyTest {
 
-       protected Graph<IntValue,NullValue,NullValue> graph;
+       protected Graph<IntValue, NullValue, NullValue> graph;
 
        @Before
        public void setup() {
@@ -65,7 +65,7 @@ public class SimplifyTest {
                        "(0,2,(null))\n" +
                        "(1,0,(null))";
 
-               Graph<IntValue,NullValue,NullValue> simpleGraph = graph
+               Graph<IntValue, NullValue, NullValue> simpleGraph = graph
                        .run(new Simplify<IntValue, NullValue, NullValue>());
 
                
TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), 
expectedResult);

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
index 2ace2bd..d589000 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
@@ -32,7 +32,7 @@ import java.util.List;
 
 public class SimplifyTest {
 
-       protected Graph<IntValue,NullValue,NullValue> graph;
+       protected Graph<IntValue, NullValue, NullValue> graph;
 
        @Before
        public void setup() {
@@ -66,7 +66,7 @@ public class SimplifyTest {
                        "(1,0,(null))\n" +
                        "(2,0,(null))";
 
-               Graph<IntValue,NullValue,NullValue> simpleGraph = graph
+               Graph<IntValue, NullValue, NullValue> simpleGraph = graph
                        .run(new Simplify<IntValue, NullValue, 
NullValue>(false));
 
                
TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), 
expectedResult);
@@ -79,7 +79,7 @@ public class SimplifyTest {
                        "(0,1,(null))\n" +
                        "(1,0,(null))";
 
-               Graph<IntValue,NullValue,NullValue> simpleGraph = graph
+               Graph<IntValue, NullValue, NullValue> simpleGraph = graph
                        .run(new Simplify<IntValue, NullValue, 
NullValue>(true));
 
                
TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), 
expectedResult);

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
index ba6bd05..7d6e3ea 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
@@ -86,7 +86,7 @@ public class TranslateTest {
        @Test
        public void testTranslateGraphIds()
                        throws Exception {
-               Graph<StringValue,LongValue, LongValue> stringIdGraph = graph
+               Graph<StringValue, LongValue, LongValue> stringIdGraph = graph
                        .translateGraphIds(new LongValueToStringValue());
 
                for (Vertex<StringValue, LongValue> vertex : 
stringIdGraph.getVertices().collect()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
index 6c0e094..cb06da5 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
@@ -36,7 +36,7 @@ extends AbstractGraphTest {
                        throws Exception {
                int vertexCount = 4;
 
-               Graph<LongValue,NullValue,NullValue> graph = new 
CompleteGraph(env, vertexCount)
+               Graph<LongValue, NullValue, NullValue> graph = new 
CompleteGraph(env, vertexCount)
                        .generate();
 
                String vertices = "0; 1; 2; 3";
@@ -50,7 +50,7 @@ extends AbstractGraphTest {
                        throws Exception {
                int vertexCount = 10;
 
-               Graph<LongValue,NullValue,NullValue> graph = new 
CompleteGraph(env, vertexCount)
+               Graph<LongValue, NullValue, NullValue> graph = new 
CompleteGraph(env, vertexCount)
                        .generate();
 
                assertEquals(vertexCount, graph.numberOfVertices());
@@ -72,12 +72,12 @@ extends AbstractGraphTest {
                        throws Exception {
                int parallelism = 2;
 
-               Graph<LongValue,NullValue,NullValue> graph = new 
CompleteGraph(env, 10)
+               Graph<LongValue, NullValue, NullValue> graph = new 
CompleteGraph(env, 10)
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
index ec36aa7..be56f56 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
@@ -34,7 +34,7 @@ extends AbstractGraphTest {
        @Test
        public void testGraph()
                        throws Exception {
-               Graph<LongValue,NullValue,NullValue> graph = new 
CycleGraph(env, 10)
+               Graph<LongValue, NullValue, NullValue> graph = new 
CycleGraph(env, 10)
                        .generate();
 
                String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
@@ -49,7 +49,7 @@ extends AbstractGraphTest {
                        throws Exception {
                int vertexCount = 100;
 
-               Graph<LongValue,NullValue,NullValue> graph = new 
CycleGraph(env, vertexCount)
+               Graph<LongValue, NullValue, NullValue> graph = new 
CycleGraph(env, vertexCount)
                        .generate();
 
                assertEquals(vertexCount, graph.numberOfVertices());
@@ -71,12 +71,12 @@ extends AbstractGraphTest {
                        throws Exception {
                int parallelism = 2;
 
-               Graph<LongValue,NullValue,NullValue> graph = new 
CycleGraph(env, 100)
+               Graph<LongValue, NullValue, NullValue> graph = new 
CycleGraph(env, 100)
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
index d4a524f..c039607 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
@@ -34,7 +34,7 @@ extends AbstractGraphTest {
        @Test
        public void testGraph()
                        throws Exception {
-               Graph<LongValue,NullValue,NullValue> graph = new 
EmptyGraph(env, 10)
+               Graph<LongValue, NullValue, NullValue> graph = new 
EmptyGraph(env, 10)
                        .generate();
 
                String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
@@ -48,7 +48,7 @@ extends AbstractGraphTest {
                        throws Exception {
                int vertexCount = 100;
 
-               Graph<LongValue,NullValue,NullValue> graph = new 
EmptyGraph(env, vertexCount)
+               Graph<LongValue, NullValue, NullValue> graph = new 
EmptyGraph(env, vertexCount)
                        .generate();
 
                assertEquals(vertexCount, graph.numberOfVertices());
@@ -66,12 +66,12 @@ extends AbstractGraphTest {
                        throws Exception {
                int parallelism = 2;
 
-               Graph<LongValue,NullValue,NullValue> graph = new 
EmptyGraph(env, 100)
+               Graph<LongValue, NullValue, NullValue> graph = new 
EmptyGraph(env, 100)
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
index 9606d1a..6a01a34 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
@@ -79,14 +79,14 @@ extends AbstractGraphTest {
                        throws Exception {
                int parallelism = 2;
 
-               Graph<LongValue,NullValue,NullValue> graph = new GridGraph(env)
+               Graph<LongValue, NullValue, NullValue> graph = new 
GridGraph(env)
                        .addDimension(3, false)
                        .addDimension(5, false)
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
index d723ecb..49b0ba7 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
@@ -73,12 +73,12 @@ extends AbstractGraphTest {
                        throws Exception {
                int parallelism = 2;
 
-               Graph<LongValue,NullValue,NullValue> graph = new 
HypercubeGraph(env, 4)
+               Graph<LongValue, NullValue, NullValue> graph = new 
HypercubeGraph(env, 4)
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
index 3c3ce8c..b5c7cd3 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
@@ -34,7 +34,7 @@ extends AbstractGraphTest {
        @Test
        public void testGraph()
                        throws Exception {
-               Graph<LongValue,NullValue,NullValue> graph = new PathGraph(env, 
10)
+               Graph<LongValue, NullValue, NullValue> graph = new 
PathGraph(env, 10)
                        .generate();
 
                String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
@@ -49,7 +49,7 @@ extends AbstractGraphTest {
                        throws Exception {
                int vertexCount = 100;
 
-               Graph<LongValue,NullValue,NullValue> graph = new PathGraph(env, 
vertexCount)
+               Graph<LongValue, NullValue, NullValue> graph = new 
PathGraph(env, vertexCount)
                        .generate();
 
                assertEquals(vertexCount, graph.numberOfVertices());
@@ -71,12 +71,12 @@ extends AbstractGraphTest {
                        throws Exception {
                int parallelism = 2;
 
-               Graph<LongValue,NullValue,NullValue> graph = new PathGraph(env, 
100)
+               Graph<LongValue, NullValue, NullValue> graph = new 
PathGraph(env, 100)
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
index a06c63f..2cda69a 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
@@ -44,7 +44,7 @@ extends AbstractGraphTest {
 
                RandomGenerableFactory<JDKRandomGenerator> rnd = new 
JDKRandomGeneratorFactory();
 
-               Graph<LongValue,NullValue,NullValue> graph = new 
RMatGraph<>(env, rnd, vertexCount, edgeCount)
+               Graph<LongValue, NullValue, NullValue> graph = new 
RMatGraph<>(env, rnd, vertexCount, edgeCount)
                        .generate();
 
                assertTrue(vertexCount >= graph.numberOfVertices());
@@ -58,12 +58,12 @@ extends AbstractGraphTest {
 
                RandomGenerableFactory<JDKRandomGenerator> rnd = new 
JDKRandomGeneratorFactory();
 
-               Graph<LongValue,NullValue,NullValue> graph = new 
RMatGraph<>(env, rnd, 100, 1000)
+               Graph<LongValue, NullValue, NullValue> graph = new 
RMatGraph<>(env, rnd, 100, 1000)
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
index 44a4d99..5e9a79a 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
@@ -36,7 +36,7 @@ extends AbstractGraphTest {
                        throws Exception {
                int vertexPairCount = 5;
 
-               Graph<LongValue,NullValue,NullValue> graph = new 
SingletonEdgeGraph(env, vertexPairCount)
+               Graph<LongValue, NullValue, NullValue> graph = new 
SingletonEdgeGraph(env, vertexPairCount)
                        .generate();
 
                String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
@@ -50,7 +50,7 @@ extends AbstractGraphTest {
                        throws Exception {
                int vertexPairCount = 10;
 
-               Graph<LongValue,NullValue,NullValue> graph = new 
SingletonEdgeGraph(env, vertexPairCount)
+               Graph<LongValue, NullValue, NullValue> graph = new 
SingletonEdgeGraph(env, vertexPairCount)
                        .generate();
 
                assertEquals(2 * vertexPairCount, graph.numberOfVertices());
@@ -72,12 +72,12 @@ extends AbstractGraphTest {
                        throws Exception {
                int parallelism = 2;
 
-               Graph<LongValue,NullValue,NullValue> graph = new 
SingletonEdgeGraph(env, 10)
+               Graph<LongValue, NullValue, NullValue> graph = new 
SingletonEdgeGraph(env, 10)
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
index c656cfb..4ccb804 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
@@ -36,7 +36,7 @@ extends AbstractGraphTest {
                        throws Exception {
                int vertexCount = 10;
 
-               Graph<LongValue,NullValue,NullValue> graph = new StarGraph(env, 
vertexCount)
+               Graph<LongValue, NullValue, NullValue> graph = new 
StarGraph(env, vertexCount)
                        .generate();
 
                String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
@@ -51,7 +51,7 @@ extends AbstractGraphTest {
                        throws Exception {
                int vertexCount = 100;
 
-               Graph<LongValue,NullValue,NullValue> graph = new StarGraph(env, 
vertexCount)
+               Graph<LongValue, NullValue, NullValue> graph = new 
StarGraph(env, vertexCount)
                        .generate();
 
                assertEquals(vertexCount, graph.numberOfVertices());
@@ -73,12 +73,12 @@ extends AbstractGraphTest {
                        throws Exception {
                int parallelism = 2;
 
-               Graph<LongValue,NullValue,NullValue> graph = new StarGraph(env, 
100)
+               Graph<LongValue, NullValue, NullValue> graph = new 
StarGraph(env, 100)
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
index a302a30..2cf9eac 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
@@ -48,7 +48,7 @@ public final class TestUtils {
         * @param <EV> the value type for edges
         * @throws Exception
         */
-       public static <K,VV,EV> void compareGraph(Graph<K,VV,EV> graph, String 
expectedVertices, String expectedEdges)
+       public static <K, VV, EV> void compareGraph(Graph<K, VV, EV> graph, 
String expectedVertices, String expectedEdges)
                        throws Exception {
                compareVertices(graph, expectedVertices);
                compareEdges(graph, expectedEdges);
@@ -63,7 +63,7 @@ public final class TestUtils {
                                resultVertices.add(vertex.f0.toString());
                        }
 
-                       TestBaseUtils.compareResultAsText(resultVertices, 
expectedVertices.replaceAll("\\s","").replace(";", "\n"));
+                       TestBaseUtils.compareResultAsText(resultVertices, 
expectedVertices.replaceAll("\\s", "").replace(";", "\n"));
                }
        }
 
@@ -76,7 +76,7 @@ public final class TestUtils {
                                resultEdges.add(edge.f0.toString() + "," + 
edge.f1.toString());
                        }
 
-                       TestBaseUtils.compareResultAsText(resultEdges, 
expectedEdges.replaceAll("\\s","").replace(";", "\n"));
+                       TestBaseUtils.compareResultAsText(resultEdges, 
expectedEdges.replaceAll("\\s", "").replace(";", "\n"));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
index 1551d84..679bf4f 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
@@ -46,8 +46,8 @@ extends AsmTestBase {
                List<Tuple2<Double, Double>> expectedResults = new 
ArrayList<>();
                expectedResults.add(Tuple2.of(0.5446287864731747, 0.0));
                expectedResults.add(Tuple2.of(0.0, 0.8363240238999012));
-               
expectedResults.add(Tuple2.of(0.6072453524686667,0.26848532437604833));
-               
expectedResults.add(Tuple2.of(0.5446287864731747,0.39546603929699625));
+               expectedResults.add(Tuple2.of(0.6072453524686667, 
0.26848532437604833));
+               expectedResults.add(Tuple2.of(0.5446287864731747, 
0.39546603929699625));
                expectedResults.add(Tuple2.of(0.0, 0.26848532437604833));
                expectedResults.add(Tuple2.of(0.194966796646811, 0.0));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
index 04f0ca4..fb21c14 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
@@ -18,31 +18,31 @@
 
 package org.apache.flink.graph.pregel;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.junit.Test;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.Tuple2ToVertexMap;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.PlanNode;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.types.NullValue;
-import org.apache.flink.graph.utils.Tuple2ToVertexMap;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class PregelCompilerTest extends CompilerTestBase {
 
@@ -62,7 +62,7 @@ public class PregelCompilerTest extends CompilerTestBase {
                                                .map(new 
Tuple2ToVertexMap<Long, Long>());
 
                                DataSet<Edge<Long, NullValue>> edges = 
env.fromElements(new Tuple2<>(1L, 2L))
-                                       .map(new MapFunction<Tuple2<Long,Long>, 
Edge<Long, NullValue>>() {
+                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, NullValue>>() {
 
                                                public Edge<Long, NullValue> 
map(Tuple2<Long, Long> edge) {
                                                        return new 
Edge<>(edge.f0, edge.f1, NullValue.getInstance());
@@ -136,7 +136,7 @@ public class PregelCompilerTest extends CompilerTestBase {
                                                .map(new 
Tuple2ToVertexMap<Long, Long>());
 
                                DataSet<Edge<Long, NullValue>> edges = 
env.fromElements(new Tuple2<>(1L, 2L))
-                                               .map(new 
MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
+                                               .map(new 
MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
 
                                                        public Edge<Long, 
NullValue> map(Tuple2<Long, Long> edge) {
                                                                return new 
Edge<>(edge.f0, edge.f1, NullValue.getInstance());
@@ -210,7 +210,7 @@ public class PregelCompilerTest extends CompilerTestBase {
                                                .map(new 
Tuple2ToVertexMap<Long, Long>());
 
                                DataSet<Edge<Long, NullValue>> edges = 
env.fromElements(new Tuple2<>(1L, 2L))
-                                       .map(new MapFunction<Tuple2<Long,Long>, 
Edge<Long, NullValue>>() {
+                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, NullValue>>() {
 
                                                public Edge<Long, NullValue> 
map(Tuple2<Long, Long> edge) {
                                                        return new 
Edge<>(edge.f0, edge.f1, NullValue.getInstance());

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
index 8f9552f..3bf2e32 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
@@ -19,26 +19,26 @@
 
 package org.apache.flink.graph.pregel;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.junit.Test;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.DeltaIterationResultSet;
 import org.apache.flink.api.java.operators.SingleInputUdfOperator;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.TwoInputUdfOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @SuppressWarnings("serial")
 public class PregelTranslationTest {
@@ -71,7 +71,7 @@ public class PregelTranslationTest {
                                DataSet<Tuple2<String, String>> edges = 
env.fromElements(new Tuple2<>("a", "c"));
 
                                Graph<String, Double, NullValue> graph = 
Graph.fromTupleDataSet(initialVertices,
-                                               edges.map(new 
MapFunction<Tuple2<String,String>, Tuple3<String, String, NullValue>>() {
+                                               edges.map(new 
MapFunction<Tuple2<String, String>, Tuple3<String, String, NullValue>>() {
 
                                                        public Tuple3<String, 
String, NullValue> map(
                                                                        
Tuple2<String, String> edge) {

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
index 14c2fb4..676e0cd 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
@@ -18,33 +18,33 @@
 
 package org.apache.flink.graph.spargel;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.junit.Test;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.library.ConnectedComponents;
+import org.apache.flink.graph.utils.Tuple2ToVertexMap;
 import org.apache.flink.optimizer.plan.DualInputPlanNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.PlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.types.NullValue;
-import org.apache.flink.graph.library.ConnectedComponents;
-import org.apache.flink.graph.utils.Tuple2ToVertexMap;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 
 public class SpargelCompilerTest extends CompilerTestBase {
@@ -65,7 +65,7 @@ public class SpargelCompilerTest extends CompilerTestBase {
                                                .map(new 
Tuple2ToVertexMap<Long, Long>());
 
                                DataSet<Edge<Long, NullValue>> edges = 
env.fromElements(new Tuple2<>(1L, 2L))
-                                       .map(new MapFunction<Tuple2<Long,Long>, 
Edge<Long, NullValue>>() {
+                                       .map(new MapFunction<Tuple2<Long, 
Long>, Edge<Long, NullValue>>() {
 
                                                public Edge<Long, NullValue> 
map(Tuple2<Long, Long> edge) {
                                                        return new 
Edge<>(edge.f0, edge.f1, NullValue.getInstance());
@@ -147,7 +147,7 @@ public class SpargelCompilerTest extends CompilerTestBase {
                                                .map(new 
Tuple2ToVertexMap<Long, Long>());
 
                                DataSet<Edge<Long, NullValue>> edges = 
env.fromElements(new Tuple2<>(1L, 2L))
-                                               .map(new 
MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
+                                               .map(new 
MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
 
                                                        public Edge<Long, 
NullValue> map(Tuple2<Long, Long> edge) {
                                                                return new 
Edge<>(edge.f0, edge.f1, NullValue.getInstance());

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
index 2c7e093..47b785d 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
@@ -72,7 +72,7 @@ public class SpargelTranslationTest {
                                DataSet<Tuple2<String, String>> edges = 
env.fromElements(new Tuple2<>("a", "c"));
 
                                Graph<String, Double, NullValue> graph = 
Graph.fromTupleDataSet(initialVertices,
-                                               edges.map(new 
MapFunction<Tuple2<String,String>, Tuple3<String, String, NullValue>>() {
+                                               edges.map(new 
MapFunction<Tuple2<String, String>, Tuple3<String, String, NullValue>>() {
 
                                                        public Tuple3<String, 
String, NullValue> map(
                                                                        
Tuple2<String, String> edge) {
@@ -158,7 +158,7 @@ public class SpargelTranslationTest {
                                DataSet<Tuple2<String, String>> edges = 
env.fromElements(new Tuple2<>("a", "c"));
 
                                Graph<String, Double, NullValue> graph = 
Graph.fromTupleDataSet(initialVertices,
-                                               edges.map(new 
MapFunction<Tuple2<String,String>, Tuple3<String, String, NullValue>>() {
+                                               edges.map(new 
MapFunction<Tuple2<String, String>, Tuple3<String, String, NullValue>>() {
 
                                                        public Tuple3<String, 
String, NullValue> map(
                                                                        
Tuple2<String, String> edge) {

http://git-wip-us.apache.org/repos/asf/flink/blob/53716a4d/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
index 2213700..9100883 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
@@ -73,8 +73,8 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                Graph<Long, Long, Long> res = graph.runScatterGatherIteration(
                                new MessageFunction(), new UpdateFunction(), 
10, parameters);
 
-               DataSet<Vertex<Long,Long>> data = res.getVertices();
-               List<Vertex<Long,Long>> result= data.collect();
+               DataSet<Vertex<Long, Long>> data = res.getVertices();
+               List<Vertex<Long, Long>> result= data.collect();
 
                expectedResult = "1,11\n" +
                                                "2,11\n" +
@@ -109,7 +109,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                Assert.assertEquals(true, 
iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());
 
                DataSet<Vertex<Long, Long>> data = 
TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
-        List<Vertex<Long,Long>> result= data.collect();
+        List<Vertex<Long, Long>> result= data.collect();
 
                expectedResult = "1,11\n" +
                                                "2,12\n" +

Reply via email to