http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java
index b7197a4..79c3468 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java
@@ -19,9 +19,7 @@
 package org.apache.flink.graph.generator;
 
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -82,8 +80,8 @@ extends GraphGeneratorTestBase {
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
+               graph.getVertices().output(new DiscardingOutputFormat<>());
+               graph.getEdges().output(new DiscardingOutputFormat<>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
index 1791f2e..45446b3 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
@@ -19,9 +19,7 @@
 package org.apache.flink.graph.generator;
 
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -80,8 +78,8 @@ extends GraphGeneratorTestBase {
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
+               graph.getVertices().output(new DiscardingOutputFormat<>());
+               graph.getEdges().output(new DiscardingOutputFormat<>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
index e4e2960..dd0912a 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
@@ -19,9 +19,7 @@
 package org.apache.flink.graph.generator;
 
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -79,8 +77,8 @@ extends GraphGeneratorTestBase {
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
+               graph.getVertices().output(new DiscardingOutputFormat<>());
+               graph.getEdges().output(new DiscardingOutputFormat<>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java
index fc64d62..5d8bfc4 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java
@@ -19,9 +19,7 @@
 package org.apache.flink.graph.generator;
 
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -124,8 +122,8 @@ extends GraphGeneratorTestBase {
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
+               graph.getVertices().output(new DiscardingOutputFormat<>());
+               graph.getEdges().output(new DiscardingOutputFormat<>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
index 939d871..3590922 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
@@ -19,9 +19,7 @@
 package org.apache.flink.graph.generator;
 
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -74,8 +72,8 @@ extends GraphGeneratorTestBase {
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
+               graph.getVertices().output(new DiscardingOutputFormat<>());
+               graph.getEdges().output(new DiscardingOutputFormat<>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
index c40d456..7281770 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
@@ -19,9 +19,7 @@
 package org.apache.flink.graph.generator;
 
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -89,8 +87,8 @@ extends GraphGeneratorTestBase {
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
+               graph.getVertices().output(new DiscardingOutputFormat<>());
+               graph.getEdges().output(new DiscardingOutputFormat<>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
index 05f84cd..3ecd4a4 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
@@ -19,9 +19,7 @@
 package org.apache.flink.graph.generator;
 
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -81,8 +79,8 @@ extends GraphGeneratorTestBase {
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
+               graph.getVertices().output(new DiscardingOutputFormat<>());
+               graph.getEdges().output(new DiscardingOutputFormat<>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
index 19b0f25..6817e92 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
@@ -19,9 +19,7 @@
 package org.apache.flink.graph.generator;
 
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -79,8 +77,8 @@ extends GraphGeneratorTestBase {
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
+               graph.getVertices().output(new DiscardingOutputFormat<>());
+               graph.getEdges().output(new DiscardingOutputFormat<>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
index 920fc4e..5f18a73 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
@@ -19,9 +19,7 @@
 package org.apache.flink.graph.generator;
 
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
 import org.apache.flink.graph.generator.random.RandomGenerableFactory;
 import org.apache.flink.types.LongValue;
@@ -66,8 +64,8 @@ extends GraphGeneratorTestBase {
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
+               graph.getVertices().output(new DiscardingOutputFormat<>());
+               graph.getEdges().output(new DiscardingOutputFormat<>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
index 045354d..f78b7ff 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
@@ -19,9 +19,7 @@
 package org.apache.flink.graph.generator;
 
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -80,8 +78,8 @@ extends GraphGeneratorTestBase {
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
+               graph.getVertices().output(new DiscardingOutputFormat<>());
+               graph.getEdges().output(new DiscardingOutputFormat<>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
index ee9df74..9e9da5f 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
@@ -19,9 +19,7 @@
 package org.apache.flink.graph.generator;
 
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -81,8 +79,8 @@ extends GraphGeneratorTestBase {
                        .setParallelism(parallelism)
                        .generate();
 
-               graph.getVertices().output(new 
DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
-               graph.getEdges().output(new 
DiscardingOutputFormat<Edge<LongValue, NullValue>>());
+               graph.getVertices().output(new DiscardingOutputFormat<>());
+               graph.getEdges().output(new DiscardingOutputFormat<>());
 
                TestUtils.verifyParallelism(env, parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
index 70a2d15..c94e169 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
@@ -60,7 +60,7 @@ public class GSACompilerTest extends CompilerTestBase {
                // compose test program
 
                DataSet<Edge<Long, NullValue>> edges = env.fromElements(new 
Tuple3<>(
-                       1L, 2L, NullValue.getInstance())).map(new 
Tuple3ToEdgeMap<Long, NullValue>());
+                       1L, 2L, NullValue.getInstance())).map(new 
Tuple3ToEdgeMap<>());
 
                Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, 
new InitVertices(), env);
 
@@ -68,7 +68,7 @@ public class GSACompilerTest extends CompilerTestBase {
                        new GatherNeighborIds(), new SelectMinId(),
                        new UpdateComponentId(), 100).getVertices();
 
-               result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+               result.output(new DiscardingOutputFormat<>());
 
                Plan p = env.createProgramPlan("GSA Connected Components");
                OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
index 0dfbcb7..0501820 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
@@ -72,7 +72,7 @@ public class GSATranslationTest {
                // ------------ construct the test program ------------------
 
                DataSet<Edge<Long, NullValue>> edges = env.fromElements(new 
Tuple3<>(
-                       1L, 2L, NullValue.getInstance())).map(new 
Tuple3ToEdgeMap<Long, NullValue>());
+                       1L, 2L, NullValue.getInstance())).map(new 
Tuple3ToEdgeMap<>());
 
                Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, 
new InitVertices(), env);
 
@@ -89,7 +89,7 @@ public class GSATranslationTest {
                        new GatherNeighborIds(), new SelectMinId(),
                        new UpdateComponentId(), NUM_ITERATIONS, 
parameters).getVertices();
 
-               result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+               result.output(new DiscardingOutputFormat<>());
 
                // ------------- validate the java program ----------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java
index 3e76005..8b8927e 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java
@@ -62,7 +62,7 @@ public class ConnectedComponentsWithRandomisedEdgesITCase 
extends JavaProgramTes
 
                Graph<Long, Long, NullValue> graph = 
Graph.fromDataSet(initialVertices, edges, env);
 
-               DataSet<Vertex<Long, Long>> result = graph.run(new 
ConnectedComponents<Long, Long, NullValue>(100));
+               DataSet<Vertex<Long, Long>> result = graph.run(new 
ConnectedComponents<>(100));
 
                result.writeAsCsv(resultPath, "\n", " ");
                env.execute();

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java
index a0aba65..626b754 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficientTest.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.graph.library.clustering.directed.LocalClusteringCoeffic
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
 
 import org.apache.commons.math3.util.CombinatoricsUtils;
 import org.junit.Test;
@@ -52,7 +51,7 @@ extends AsmTestBase {
                        "(5,1,0)";
 
                DataSet<Result<IntValue>> cc = directedSimpleGraph
-                       .run(new LocalClusteringCoefficient<IntValue, 
NullValue, NullValue>());
+                       .run(new LocalClusteringCoefficient<>());
 
                TestBaseUtils.compareResultAsText(cc.collect(), expectedResult);
        }
@@ -64,7 +63,7 @@ extends AsmTestBase {
                long expectedTriangleCount = 2 * 
CombinatoricsUtils.binomialCoefficient((int) expectedDegree, 2);
 
                DataSet<Result<LongValue>> cc = completeGraph
-                       .run(new LocalClusteringCoefficient<LongValue, 
NullValue, NullValue>());
+                       .run(new LocalClusteringCoefficient<>());
 
                List<Result<LongValue>> results = cc.collect();
 
@@ -80,7 +79,7 @@ extends AsmTestBase {
        public void testRMatGraph()
                        throws Exception {
                DataSet<Result<LongValue>> cc = directedRMatGraph(10, 16)
-                       .run(new LocalClusteringCoefficient<LongValue, 
NullValue, NullValue>());
+                       .run(new LocalClusteringCoefficient<>());
 
                Checksum checksum = new 
org.apache.flink.graph.asm.dataset.ChecksumHashCode<Result<LongValue>>()
                        .run(cc)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
index e8f8659..2d02907 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriangleListingTest.java
@@ -95,7 +95,7 @@ extends AsmTestBase {
                long expectedCount = completeGraphVertexCount * 
CombinatoricsUtils.binomialCoefficient((int) expectedDegree, 2) / 3;
 
                DataSet<Result<LongValue>> tl = completeGraph
-                       .run(new TriangleListing<LongValue, NullValue, 
NullValue>());
+                       .run(new TriangleListing<>());
 
                List<Result<LongValue>> results = tl.collect();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
index e00669b..d77707e 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoeff
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
 
 import org.apache.commons.math3.util.CombinatoricsUtils;
 import org.junit.Test;
@@ -53,7 +52,7 @@ extends AsmTestBase {
                        "(5,1,0)";
 
                DataSet<Result<IntValue>> cc = undirectedSimpleGraph
-                       .run(new LocalClusteringCoefficient<IntValue, 
NullValue, NullValue>());
+                       .run(new LocalClusteringCoefficient<>());
 
                TestBaseUtils.compareResultAsText(cc.collect(), expectedResult);
        }
@@ -65,7 +64,7 @@ extends AsmTestBase {
                long expectedTriangleCount = 
CombinatoricsUtils.binomialCoefficient((int) expectedDegree, 2);
 
                DataSet<Result<LongValue>> cc = completeGraph
-                       .run(new LocalClusteringCoefficient<LongValue, 
NullValue, NullValue>());
+                       .run(new LocalClusteringCoefficient<>());
 
                List<Result<LongValue>> results = cc.collect();
 
@@ -81,7 +80,7 @@ extends AsmTestBase {
        public void testRMatGraph()
                        throws Exception {
                DataSet<Result<LongValue>> cc = undirectedRMatGraph(10, 16)
-                       .run(new LocalClusteringCoefficient<LongValue, 
NullValue, NullValue>());
+                       .run(new LocalClusteringCoefficient<>());
 
                Checksum checksum = new ChecksumHashCode<Result<LongValue>>()
                        .run(cc)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
index 6af1b01..2e34945 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleListingTest.java
@@ -95,7 +95,7 @@ extends AsmTestBase {
                long expectedCount = completeGraphVertexCount * 
CombinatoricsUtils.binomialCoefficient((int) expectedDegree, 2) / 3;
 
                DataSet<Result<LongValue>> tl = completeGraph
-                       .run(new TriangleListing<LongValue, NullValue, 
NullValue>());
+                       .run(new TriangleListing<>());
 
                Checksum checksum = new ChecksumHashCode<Result<LongValue>>()
                        .run(tl)

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/HITSTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/HITSTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/HITSTest.java
index 4471105..2c94260 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/HITSTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/linkanalysis/HITSTest.java
@@ -115,7 +115,7 @@ extends AsmTestBase {
        public void testWithRMatGraph()
                        throws Exception {
                DataSet<Result<LongValue>> hits = directedRMatGraph(10, 16)
-                       .run(new HITS<LongValue, NullValue, 
NullValue>(0.000001));
+                       .run(new HITS<>(0.000001));
 
                Map<Long, Result<LongValue>> results = new HashMap<>();
                for (Result<LongValue> result :  new 
Collect<Result<LongValue>>().run(hits).execute()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
index 9b1d18c..4deb491 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/ChecksumHashCodeTest.java
@@ -41,7 +41,7 @@ extends AsmTestBase {
                        env);
 
                Checksum checksum = graph
-                       .run(new ChecksumHashCode<Long, Long, Long>())
+                       .run(new ChecksumHashCode<>())
                        .execute();
 
                assertEquals(checksum.getCount(), 12L);

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/AdamicAdarTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/AdamicAdarTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/AdamicAdarTest.java
index aa259a2..5afd0ee 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/AdamicAdarTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/AdamicAdarTest.java
@@ -49,7 +49,7 @@ extends AsmTestBase {
        public void testSimpleGraph()
                        throws Exception {
                DataSet<Result<IntValue>> aa = undirectedSimpleGraph
-                       .run(new AdamicAdar<IntValue, NullValue, NullValue>());
+                       .run(new AdamicAdar<>());
 
                String expectedResult =
                        "(0,1," + ilog[2] + ")\n" +
@@ -105,7 +105,7 @@ extends AsmTestBase {
                float expectedScore = (completeGraphVertexCount - 2) / (float) 
Math.log(completeGraphVertexCount - 1);
 
                DataSet<Result<LongValue>> aa = completeGraph
-                       .run(new AdamicAdar<LongValue, NullValue, NullValue>());
+                       .run(new AdamicAdar<>());
 
                for (Result<LongValue> result : aa.collect()) {
                        assertEquals(expectedScore, 
result.getAdamicAdarScore().getValue(), 0.00001);
@@ -116,7 +116,7 @@ extends AsmTestBase {
        public void testRMatGraph()
                        throws Exception {
                DataSet<Result<LongValue>> aa = undirectedRMatGraph(8, 8)
-                       .run(new AdamicAdar<LongValue, NullValue, NullValue>());
+                       .run(new AdamicAdar<>());
 
                assertEquals(13954, aa.count());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
index d8cd298..2e59f93 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
@@ -42,7 +42,7 @@ extends AsmTestBase {
        public void testSimpleGraph()
                        throws Exception {
                DataSet<Result<IntValue>> ji = undirectedSimpleGraph
-                       .run(new JaccardIndex<IntValue, NullValue, 
NullValue>());
+                       .run(new JaccardIndex<>());
 
                String expectedResult =
                        "(0,1,1,4)\n" +

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
index 71937db..95dd96e 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelCompilerTest.java
@@ -61,7 +61,7 @@ public class PregelCompilerTest extends CompilerTestBase {
 
                        DataSet<Vertex<Long, Long>> initialVertices = 
env.fromElements(
                                new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
-                               .map(new Tuple2ToVertexMap<Long, Long>());
+                               .map(new Tuple2ToVertexMap<>());
 
                        DataSet<Edge<Long, NullValue>> edges = 
env.fromElements(new Tuple2<>(1L, 2L))
                                .map(new MapFunction<Tuple2<Long, Long>, 
Edge<Long, NullValue>>() {
@@ -76,7 +76,7 @@ public class PregelCompilerTest extends CompilerTestBase {
                        DataSet<Vertex<Long, Long>> result = 
graph.runVertexCentricIteration(
                                new CCCompute(), null, 100).getVertices();
 
-                       result.output(new DiscardingOutputFormat<Vertex<Long, 
Long>>());
+                       result.output(new DiscardingOutputFormat<>());
                }
 
                Plan p = env.createProgramPlan("Pregel Connected Components");
@@ -126,7 +126,7 @@ public class PregelCompilerTest extends CompilerTestBase {
 
                        DataSet<Vertex<Long, Long>> initialVertices = 
env.fromElements(
                                new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
-                               .map(new Tuple2ToVertexMap<Long, Long>());
+                               .map(new Tuple2ToVertexMap<>());
 
                        DataSet<Edge<Long, NullValue>> edges = 
env.fromElements(new Tuple2<>(1L, 2L))
                                .map(new MapFunction<Tuple2<Long, Long>, 
Edge<Long, NullValue>>() {
@@ -145,7 +145,7 @@ public class PregelCompilerTest extends CompilerTestBase {
                                new CCCompute(), null, 100, parameters)
                                .getVertices();
 
-                       result.output(new DiscardingOutputFormat<Vertex<Long, 
Long>>());
+                       result.output(new DiscardingOutputFormat<>());
                }
 
                Plan p = env.createProgramPlan("Pregel Connected Components");
@@ -192,7 +192,7 @@ public class PregelCompilerTest extends CompilerTestBase {
 
                        DataSet<Vertex<Long, Long>> initialVertices = 
env.fromElements(
                                new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
-                               .map(new Tuple2ToVertexMap<Long, Long>());
+                               .map(new Tuple2ToVertexMap<>());
 
                        DataSet<Edge<Long, NullValue>> edges = 
env.fromElements(new Tuple2<>(1L, 2L))
                                .map(new MapFunction<Tuple2<Long, Long>, 
Edge<Long, NullValue>>() {
@@ -207,7 +207,7 @@ public class PregelCompilerTest extends CompilerTestBase {
                        DataSet<Vertex<Long, Long>> result = 
graph.runVertexCentricIteration(
                                new CCCompute(), new CCCombiner(), 
100).getVertices();
 
-                       result.output(new DiscardingOutputFormat<Vertex<Long, 
Long>>());
+                       result.output(new DiscardingOutputFormat<>());
                }
 
                Plan p = env.createProgramPlan("Pregel Connected Components");

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
index 8084e71..0e79f65 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/pregel/PregelTranslationTest.java
@@ -88,7 +88,7 @@ public class PregelTranslationTest {
                result = graph.runVertexCentricIteration(new MyCompute(), null,
                        NUM_ITERATIONS, parameters).getVertices();
 
-               result.output(new DiscardingOutputFormat<Vertex<String, 
Double>>());
+               result.output(new DiscardingOutputFormat<>());
 
                // ------------- validate the java program ----------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
index 1c6d08e..901276c 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
@@ -62,7 +62,7 @@ public class SpargelCompilerTest extends CompilerTestBase {
                // compose test program
                DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
                        new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
-                       .map(new Tuple2ToVertexMap<Long, Long>());
+                       .map(new Tuple2ToVertexMap<>());
 
                DataSet<Edge<Long, NullValue>> edges = env.fromElements(new 
Tuple2<>(1L, 2L))
                        .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, 
NullValue>>() {
@@ -75,11 +75,11 @@ public class SpargelCompilerTest extends CompilerTestBase {
                Graph<Long, Long, NullValue> graph = 
Graph.fromDataSet(initialVertices, edges, env);
 
                DataSet<Vertex<Long, Long>> result = 
graph.runScatterGatherIteration(
-                       new ConnectedComponents.CCMessenger<Long, 
Long>(BasicTypeInfo.LONG_TYPE_INFO),
-                       new ConnectedComponents.CCUpdater<Long, Long>(), 100)
+                       new 
ConnectedComponents.CCMessenger<>(BasicTypeInfo.LONG_TYPE_INFO),
+                       new ConnectedComponents.CCUpdater<>(), 100)
                        .getVertices();
 
-               result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+               result.output(new DiscardingOutputFormat<>());
 
                Plan p = env.createProgramPlan("Spargel Connected Components");
                OptimizedPlan op = compileNoStats(p);
@@ -136,7 +136,7 @@ public class SpargelCompilerTest extends CompilerTestBase {
 
                DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
                        new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
-                       .map(new Tuple2ToVertexMap<Long, Long>());
+                       .map(new Tuple2ToVertexMap<>());
 
                DataSet<Edge<Long, NullValue>> edges = env.fromElements(new 
Tuple2<>(1L, 2L))
                        .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, 
NullValue>>() {
@@ -153,11 +153,11 @@ public class SpargelCompilerTest extends CompilerTestBase 
{
                
parameters.addBroadcastSetForGatherFunction(broadcastVariableName, bcVar);
 
                DataSet<Vertex<Long, Long>> result = 
graph.runScatterGatherIteration(
-                       new ConnectedComponents.CCMessenger<Long, 
Long>(BasicTypeInfo.LONG_TYPE_INFO),
-                       new ConnectedComponents.CCUpdater<Long, Long>(), 100)
+                       new 
ConnectedComponents.CCMessenger<>(BasicTypeInfo.LONG_TYPE_INFO),
+                       new ConnectedComponents.CCUpdater<>(), 100)
                        .getVertices();
 
-               result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+               result.output(new DiscardingOutputFormat<>());
 
                Plan p = env.createProgramPlan("Spargel Connected Components");
                OptimizedPlan op = compileNoStats(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
index d209a2d..cbbf0c4 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelTranslationTest.java
@@ -91,7 +91,7 @@ public class SpargelTranslationTest {
                result = graph.runScatterGatherIteration(new 
MessageFunctionNoEdgeValue(), new UpdateFunction(),
                        NUM_ITERATIONS, parameters).getVertices();
 
-               result.output(new DiscardingOutputFormat<Vertex<String, 
Double>>());
+               result.output(new DiscardingOutputFormat<>());
 
                // ------------- validate the java program ----------------
 
@@ -154,7 +154,7 @@ public class SpargelTranslationTest {
                result = graph.runScatterGatherIteration(new 
MessageFunctionNoEdgeValue(), new UpdateFunction(),
                        NUM_ITERATIONS, parameters).getVertices();
 
-               result.output(new DiscardingOutputFormat<Vertex<String, 
Double>>());
+               result.output(new DiscardingOutputFormat<>());
 
                // ------------- validate the java program ----------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
index 2454b38..8b1ab91 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.graph.test;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.GatherFunction;
@@ -53,8 +52,9 @@ public class CollectionModeSuperstepITCase extends TestLogger 
{
                                new MessageFunction(), new UpdateFunction(), 
10);
 
                result.getVertices().map(
-                               new VertexToTuple2Map<Long, Long>()).output(
-                                               new 
DiscardingOutputFormat<Tuple2<Long, Long>>());
+                       new VertexToTuple2Map<>()).output(
+                               new DiscardingOutputFormat<>());
+
                env.execute();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
index 139ff1e..94d981c 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
@@ -137,7 +137,7 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
                Graph<Long, Long, Long> res = graph.runScatterGatherIteration(
                        new MessageFunctionDefault(), new 
UpdateFunctionDefault(), 5);
 
-               DataSet<Tuple2<Long, Long>> data = res.getVertices().map(new 
VertexToTuple2Map<Long, Long>());
+               DataSet<Tuple2<Long, Long>> data = res.getVertices().map(new 
VertexToTuple2Map<>());
                List<Tuple2<Long, Long>> result = data.collect();
 
                expectedResult = "1,6\n" +

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
index 8b726f4..f01daec 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
@@ -20,14 +20,12 @@ package org.apache.flink.graph.test.operations;
 
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.test.util.TestEnvironment;
-import org.apache.flink.types.LongValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
@@ -77,7 +75,7 @@ public class DegreesWithExceptionITCase extends TestLogger {
                                
TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
 
                try {
-                       graph.outDegrees().output(new 
DiscardingOutputFormat<Tuple2<Long, LongValue>>());
+                       graph.outDegrees().output(new 
DiscardingOutputFormat<>());
                        env.execute();
 
                        fail("graph.outDegrees() did not fail.");
@@ -100,7 +98,7 @@ public class DegreesWithExceptionITCase extends TestLogger {
                                
TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
 
                try {
-                       graph.inDegrees().output(new 
DiscardingOutputFormat<Tuple2<Long, LongValue>>());
+                       graph.inDegrees().output(new 
DiscardingOutputFormat<>());
                        env.execute();
 
                        fail("graph.inDegrees() did not fail.");
@@ -123,7 +121,7 @@ public class DegreesWithExceptionITCase extends TestLogger {
                                
TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
 
                try {
-                       graph.getDegrees().output(new 
DiscardingOutputFormat<Tuple2<Long, LongValue>>());
+                       graph.getDegrees().output(new 
DiscardingOutputFormat<>());
                        env.execute();
 
                        fail("graph.getDegrees() did not fail.");
@@ -146,7 +144,7 @@ public class DegreesWithExceptionITCase extends TestLogger {
                                
TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
 
                try {
-                       graph.getDegrees().output(new 
DiscardingOutputFormat<Tuple2<Long, LongValue>>());
+                       graph.getDegrees().output(new 
DiscardingOutputFormat<>());
                        env.execute();
 
                        fail("graph.getDegrees() did not fail.");
@@ -169,7 +167,7 @@ public class DegreesWithExceptionITCase extends TestLogger {
                                
TestGraphUtils.getLongLongEdgeInvalidSrcTrgData(env), env);
 
                try {
-                       graph.getDegrees().output(new 
DiscardingOutputFormat<Tuple2<Long, LongValue>>());
+                       graph.getDegrees().output(new 
DiscardingOutputFormat<>());
                        env.execute();
 
                        fail("graph.getDegrees() did not fail.");

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
index 991a420..eb2fe5b 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java
@@ -122,7 +122,7 @@ public class GraphCreationITCase extends 
MultipleProgramsTestBase {
                DataSet<Edge<Long, Long>> edges = 
TestGraphUtils.getLongLongEdgeData(env);
 
                Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, 
edges, env);
-               Boolean valid = graph.validate(new 
InvalidVertexIdsValidator<Long, Long, Long>());
+               Boolean valid = graph.validate(new 
InvalidVertexIdsValidator<>());
 
                //env.fromElements(result).writeAsText(resultPath);
 
@@ -144,7 +144,7 @@ public class GraphCreationITCase extends 
MultipleProgramsTestBase {
                DataSet<Edge<Long, Long>> edges = 
TestGraphUtils.getLongLongEdgeData(env);
 
                Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, 
edges, env);
-               Boolean valid = graph.validate(new 
InvalidVertexIdsValidator<Long, Long, Long>());
+               Boolean valid = graph.validate(new 
InvalidVertexIdsValidator<>());
 
                String res = valid.toString(); //env.fromElements(valid);
                List<String> result = new LinkedList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
index 43ff124..e3909ca 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
@@ -62,7 +62,7 @@ public class JoinWithEdgesITCase extends 
MultipleProgramsTestBase {
                        TestGraphUtils.getLongLongEdgeData(env), env);
 
                Graph<Long, Long, Long> res = 
graph.joinWithEdges(graph.getEdges()
-                       .map(new EdgeToTuple3Map<Long, Long>()), new 
AddValuesMapper());
+                       .map(new EdgeToTuple3Map<>()), new AddValuesMapper());
 
                DataSet<Edge<Long, Long>> data = res.getEdges();
                List<Edge<Long, Long>> result = data.collect();
@@ -90,7 +90,7 @@ public class JoinWithEdgesITCase extends 
MultipleProgramsTestBase {
                        TestGraphUtils.getLongLongEdgeData(env), env);
 
                Graph<Long, Long, Long> res = 
graph.joinWithEdges(graph.getEdges().first(3)
-                       .map(new EdgeToTuple3Map<Long, Long>()), new 
AddValuesMapper());
+                       .map(new EdgeToTuple3Map<>()), new AddValuesMapper());
 
                DataSet<Edge<Long, Long>> data = res.getEdges();
                List<Edge<Long, Long>> result = data.collect();

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
index 181c1a7..fe5c52b 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
@@ -60,7 +60,7 @@ public class JoinWithVerticesITCase extends 
MultipleProgramsTestBase {
                        TestGraphUtils.getLongLongEdgeData(env), env);
 
                Graph<Long, Long, Long> res = 
graph.joinWithVertices(graph.getVertices()
-                       .map(new VertexToTuple2Map<Long, Long>()), new 
AddValuesMapper());
+                       .map(new VertexToTuple2Map<>()), new AddValuesMapper());
 
                DataSet<Vertex<Long, Long>> data = res.getVertices();
                List<Vertex<Long, Long>> result = data.collect();
@@ -86,7 +86,7 @@ public class JoinWithVerticesITCase extends 
MultipleProgramsTestBase {
                        TestGraphUtils.getLongLongEdgeData(env), env);
 
                Graph<Long, Long, Long> res = 
graph.joinWithVertices(graph.getVertices().first(3)
-                       .map(new VertexToTuple2Map<Long, Long>()), new 
AddValuesMapper());
+                       .map(new VertexToTuple2Map<>()), new AddValuesMapper());
 
                DataSet<Vertex<Long, Long>> data = res.getVertices();
                List<Vertex<Long, Long>> result = data.collect();

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
index 19e701d..6733ba1 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesWithExceptionITCase.java
@@ -84,7 +84,7 @@ public class ReduceOnEdgesWithExceptionITCase extends 
TestLogger {
                        DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
                                        graph.groupReduceOnEdges(new 
SelectNeighborsValueGreaterThanFour(), EdgeDirection.ALL);
 
-                       verticesWithAllNeighbors.output(new 
DiscardingOutputFormat<Tuple2<Long, Long>>());
+                       verticesWithAllNeighbors.output(new 
DiscardingOutputFormat<>());
                        env.execute();
 
                        fail("Expected an exception.");
@@ -110,7 +110,7 @@ public class ReduceOnEdgesWithExceptionITCase extends 
TestLogger {
                        DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
                                        graph.groupReduceOnEdges(new 
SelectNeighborsValueGreaterThanFour(), EdgeDirection.ALL);
 
-                       verticesWithAllNeighbors.output(new 
DiscardingOutputFormat<Tuple2<Long, Long>>());
+                       verticesWithAllNeighbors.output(new 
DiscardingOutputFormat<>());
                        env.execute();
 
                        fail("Expected an exception.");

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
index d3b97a1..1a60874 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborsWithExceptionITCase.java
@@ -86,7 +86,7 @@ public class ReduceOnNeighborsWithExceptionITCase extends 
TestLogger {
                        DataSet<Tuple2<Long, Long>> 
verticesWithSumOfOutNeighborValues =
                                        graph.groupReduceOnNeighbors(new 
SumAllNeighbors(), EdgeDirection.ALL);
 
-                       verticesWithSumOfOutNeighborValues.output(new 
DiscardingOutputFormat<Tuple2<Long, Long>>());
+                       verticesWithSumOfOutNeighborValues.output(new 
DiscardingOutputFormat<>());
                        env.execute();
 
                        fail("Expected an exception.");
@@ -113,7 +113,7 @@ public class ReduceOnNeighborsWithExceptionITCase extends 
TestLogger {
                        DataSet<Tuple2<Long, Long>> 
verticesWithSumOfOutNeighborValues =
                                        graph.groupReduceOnNeighbors(new 
SumAllNeighbors(), EdgeDirection.ALL);
 
-                       verticesWithSumOfOutNeighborValues.output(new 
DiscardingOutputFormat<Tuple2<Long, Long>>());
+                       verticesWithSumOfOutNeighborValues.output(new 
DiscardingOutputFormat<>());
                        env.execute();
 
                        fail("Expected an exception.");
@@ -140,7 +140,7 @@ public class ReduceOnNeighborsWithExceptionITCase extends 
TestLogger {
                        DataSet<Tuple2<Long, Long>> 
verticesWithSumOfAllNeighborValues =
                                        graph.reduceOnNeighbors(new 
SumNeighbors(), EdgeDirection.ALL);
 
-                       verticesWithSumOfAllNeighborValues.output(new 
DiscardingOutputFormat<Tuple2<Long, Long>>());
+                       verticesWithSumOfAllNeighborValues.output(new 
DiscardingOutputFormat<>());
                        env.execute();
                } catch (Exception e) {
                        // We expect the job to fail with an exception
@@ -165,7 +165,7 @@ public class ReduceOnNeighborsWithExceptionITCase extends 
TestLogger {
                        DataSet<Tuple2<Long, Long>> 
verticesWithSumOfAllNeighborValues =
                                        graph.reduceOnNeighbors(new 
SumNeighbors(), EdgeDirection.ALL);
 
-                       verticesWithSumOfAllNeighborValues.output(new 
DiscardingOutputFormat<Tuple2<Long, Long>>());
+                       verticesWithSumOfAllNeighborValues.output(new 
DiscardingOutputFormat<>());
                        env.execute();
                } catch (Exception e) {
                        // We expect the job to fail with an exception

http://git-wip-us.apache.org/repos/asf/flink/blob/27429a74/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java
index 4d9040c..cad07c0 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/TypeExtractorTest.java
@@ -58,7 +58,7 @@ public class TypeExtractorTest {
        public void testMapVerticesType() throws Exception {
 
                // test type extraction in mapVertices
-               DataSet<Vertex<Long, Tuple2<Long, Integer>>> outVertices = 
inputGraph.mapVertices(new VertexMapper<Long>()).getVertices();
+               DataSet<Vertex<Long, Tuple2<Long, Integer>>> outVertices = 
inputGraph.mapVertices(new VertexMapper<>()).getVertices();
                Assert.assertTrue(new TupleTypeInfo(Vertex.class, 
BasicTypeInfo.LONG_TYPE_INFO,
                        new TupleTypeInfo<Tuple2<Long, 
Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
                        .equals(outVertices.getType()));
@@ -68,7 +68,7 @@ public class TypeExtractorTest {
        public void testMapEdgesType() throws Exception {
 
                // test type extraction in mapEdges
-               DataSet<Edge<Long, Tuple2<Long, Integer>>> outEdges = 
inputGraph.mapEdges(new EdgeMapper<Long>()).getEdges();
+               DataSet<Edge<Long, Tuple2<Long, Integer>>> outEdges = 
inputGraph.mapEdges(new EdgeMapper<>()).getEdges();
                Assert.assertTrue(new TupleTypeInfo(Edge.class, 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
                        new TupleTypeInfo<Tuple2<Long, 
Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
                        .equals(outEdges.getType()));
@@ -76,7 +76,7 @@ public class TypeExtractorTest {
 
        @Test
        public void testFromDataSet() throws Exception {
-               DataSet<Vertex<Long, Tuple2<Long, Integer>>> outVertices = 
Graph.fromDataSet(edges, new VertexInitializer<Long>(), env)
+               DataSet<Vertex<Long, Tuple2<Long, Integer>>> outVertices = 
Graph.fromDataSet(edges, new VertexInitializer<>(), env)
                        .getVertices();
                Assert.assertTrue(new TupleTypeInfo(Vertex.class, 
BasicTypeInfo.LONG_TYPE_INFO,
                        new TupleTypeInfo<Tuple2<Long, 
Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
@@ -85,7 +85,7 @@ public class TypeExtractorTest {
 
        @Test
        public void testGroupReduceOnEdges() throws Exception {
-               DataSet<Tuple2<Long, Long>> output = 
inputGraph.groupReduceOnEdges(new EdgesGroupFunction<Long, Long>(), 
EdgeDirection.OUT);
+               DataSet<Tuple2<Long, Long>> output = 
inputGraph.groupReduceOnEdges(new EdgesGroupFunction<>(), EdgeDirection.OUT);
                Assert.assertTrue((new TupleTypeInfo<Tuple2<Long, 
Long>>(BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO)).equals(output.getType()));
        }
 
@@ -113,7 +113,7 @@ public class TypeExtractorTest {
 
                @Override
                public void iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> 
edges, Collector<Tuple2<K, EV>> out) throws Exception {
-                       out.collect(new Tuple2<K, EV>());
+                       out.collect(new Tuple2<>());
                }
        }
 

Reply via email to