[FLINK-4729] [gelly] Use optional VertexCentric CombineFunction

Passes through the CombineFunction to VertexCentricIteration, and other
code cleanup discovered via IntelliJ's code analyzer.

This closes #2587


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb34133e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb34133e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb34133e

Branch: refs/heads/master
Commit: bb34133ed30b2a6c74baa0e5342e278e039cbe2a
Parents: 8c7c42f
Author: Greg Hogan <[email protected]>
Authored: Mon Oct 3 13:59:38 2016 -0400
Committer: Greg Hogan <[email protected]>
Committed: Wed Oct 5 12:26:16 2016 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Edge.java  |   2 +-
 .../main/java/org/apache/flink/graph/Graph.java | 165 +++++++++----------
 .../org/apache/flink/graph/GraphAlgorithm.java  |   2 +-
 .../org/apache/flink/graph/GraphCsvReader.java  |  10 +-
 .../flink/graph/IterationConfiguration.java     |   2 +-
 .../java/org/apache/flink/graph/Triplet.java    |   6 +-
 .../apache/flink/graph/VertexJoinFunction.java  |   5 +-
 .../apache/flink/graph/gsa/ApplyFunction.java   |   6 +-
 .../flink/graph/gsa/GSAConfiguration.java       |   6 +-
 .../apache/flink/graph/gsa/GatherFunction.java  |   6 +-
 .../graph/gsa/GatherSumApplyIteration.java      |  29 ++--
 .../org/apache/flink/graph/gsa/SumFunction.java |   6 +-
 .../library/GSASingleSourceShortestPaths.java   |   6 +-
 .../flink/graph/library/LabelPropagation.java   |   2 +-
 .../library/SingleSourceShortestPaths.java      |   2 +-
 .../flink/graph/library/TriangleEnumerator.java |   2 +-
 .../flink/graph/pregel/ComputeFunction.java     |  24 +--
 .../flink/graph/pregel/MessageCombiner.java     |  13 +-
 .../pregel/VertexCentricConfiguration.java      |   2 +-
 .../graph/pregel/VertexCentricIteration.java    |  45 +++--
 .../flink/graph/spargel/GatherFunction.java     |  10 +-
 .../flink/graph/spargel/ScatterFunction.java    |  12 +-
 .../graph/spargel/ScatterGatherIteration.java   |   5 +-
 .../validation/InvalidVertexIdsValidator.java   |   6 +-
 .../graph/asm/translate/TranslateTest.java      |   1 -
 .../test/GatherSumApplyConfigurationITCase.java |  33 ++--
 .../test/ScatterGatherConfigurationITCase.java  |  17 +-
 .../operations/GraphCreationWithCsvITCase.java  |   5 +-
 .../test/operations/GraphOperationsITCase.java  |  55 +++----
 .../ReduceOnNeighborMethodsITCase.java          |   5 +-
 30 files changed, 235 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
index d84badb..2bcce29 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
@@ -46,7 +46,7 @@ public class Edge<K, V> extends Tuple3<K, K, V>{
         * and the target is the original Edge's source.
         */
        public Edge<K, V> reverse() {
-                       return new Edge<K, V>(this.f1, this.f0, this.f2);
+                       return new Edge<>(this.f1, this.f0, this.f2);
        }
 
        public void setSource(K src) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 821b0a7..02d1eeb 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -66,6 +66,7 @@ import org.apache.flink.util.Collector;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -160,7 +161,7 @@ public class Graph<K, VV, EV> {
        public static <K, VV, EV> Graph<K, VV, EV> 
fromDataSet(DataSet<Vertex<K, VV>> vertices,
                        DataSet<Edge<K, EV>> edges, ExecutionEnvironment 
context) {
 
-               return new Graph<K, VV, EV>(vertices, edges, context);
+               return new Graph<>(vertices, edges, context);
        }
 
        /**
@@ -177,15 +178,15 @@ public class Graph<K, VV, EV> {
 
                DataSet<Vertex<K, NullValue>> vertices = edges.flatMap(new 
EmitSrcAndTarget<K, EV>()).distinct();
 
-               return new Graph<K, NullValue, EV>(vertices, edges, context);
+               return new Graph<>(vertices, edges, context);
        }
 
        private static final class EmitSrcAndTarget<K, EV> implements 
FlatMapFunction<
                        Edge<K, EV>, Vertex<K, NullValue>> {
 
                public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, 
NullValue>> out) {
-                       out.collect(new Vertex<K, NullValue>(edge.f0, 
NullValue.getInstance()));
-                       out.collect(new Vertex<K, NullValue>(edge.f1, 
NullValue.getInstance()));
+                       out.collect(new Vertex<>(edge.f0, 
NullValue.getInstance()));
+                       out.collect(new Vertex<>(edge.f1, 
NullValue.getInstance()));
                }
        }
 
@@ -216,19 +217,19 @@ public class Graph<K, VV, EV> {
                                .flatMap(new EmitSrcAndTargetAsTuple1<K, 
EV>()).distinct()
                                .map(new MapFunction<Tuple1<K>, Vertex<K, 
VV>>() {
                                        public Vertex<K, VV> map(Tuple1<K> 
value) throws Exception {
-                                               return new Vertex<K, 
VV>(value.f0, vertexValueInitializer.map(value.f0));
+                                               return new Vertex<>(value.f0, 
vertexValueInitializer.map(value.f0));
                                        }
                                
}).returns(returnType).withForwardedFields("f0");
 
-               return new Graph<K, VV, EV>(vertices, edges, context);
+               return new Graph<>(vertices, edges, context);
        }
 
        private static final class EmitSrcAndTargetAsTuple1<K, EV> implements 
FlatMapFunction<
                Edge<K, EV>, Tuple1<K>> {
 
                public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) 
{
-                       out.collect(new Tuple1<K>(edge.f0));
-                       out.collect(new Tuple1<K>(edge.f1));
+                       out.collect(new Tuple1<>(edge.f0));
+                       out.collect(new Tuple1<>(edge.f1));
                }
        }
 
@@ -316,7 +317,7 @@ public class Graph<K, VV, EV> {
                                new MapFunction<Tuple2<K, K>, Edge<K, 
NullValue>>() {
 
                                        public Edge<K, NullValue> map(Tuple2<K, 
K> input) {
-                                               return new Edge<K, 
NullValue>(input.f0, input.f1, NullValue.getInstance());
+                                               return new Edge<>(input.f0, 
input.f1, NullValue.getInstance());
                                        }
                }).withForwardedFields("f0; f1");
                return fromDataSet(edgeDataSet, context);
@@ -344,7 +345,7 @@ public class Graph<K, VV, EV> {
                                new MapFunction<Tuple2<K, K>, Edge<K, 
NullValue>>() {
 
                                        public Edge<K, NullValue> map(Tuple2<K, 
K> input) {
-                                               return new Edge<K, 
NullValue>(input.f0, input.f1, NullValue.getInstance());
+                                               return new Edge<>(input.f0, 
input.f1, NullValue.getInstance());
                                        }
                                }).withForwardedFields("f0; f1");
                return fromDataSet(edgeDataSet, vertexValueInitializer, 
context);
@@ -472,7 +473,7 @@ public class Graph<K, VV, EV> {
                public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, 
Collector<Tuple4<K, K, VV, EV>> collector)
                                throws Exception {
 
-                       collector.collect(new Tuple4<K, K, VV, 
EV>(edge.getSource(), edge.getTarget(), vertex.getValue(),
+                       collector.collect(new Tuple4<>(edge.getSource(), 
edge.getTarget(), vertex.getValue(),
                                        edge.getValue()));
                }
        }
@@ -486,7 +487,7 @@ public class Graph<K, VV, EV> {
                public void join(Tuple4<K, K, VV, EV> tripletWithSrcValSet,
                                                Vertex<K, VV> vertex, 
Collector<Triplet<K, VV, EV>> collector) throws Exception {
 
-                       collector.collect(new Triplet<K, VV, 
EV>(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1,
+                       collector.collect(new 
Triplet<>(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1,
                                        tripletWithSrcValSet.f2, 
vertex.getValue(), tripletWithSrcValSet.f3));
                }
        }
@@ -521,13 +522,13 @@ public class Graph<K, VV, EV> {
                DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
                                new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() 
{
                                        public Vertex<K, NV> map(Vertex<K, VV> 
value) throws Exception {
-                                               return new Vertex<K, 
NV>(value.f0, mapper.map(value));
+                                               return new Vertex<>(value.f0, 
mapper.map(value));
                                        }
                                })
                                .returns(returnType)
                                .withForwardedFields("f0");
 
-               return new Graph<K, NV, EV>(mappedVertices, this.edges, 
this.context);
+               return new Graph<>(mappedVertices, this.edges, this.context);
        }
 
        /**
@@ -596,14 +597,13 @@ public class Graph<K, VV, EV> {
                DataSet<Edge<K, NV>> mappedEdges = edges.map(
                                new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
                                        public Edge<K, NV> map(Edge<K, EV> 
value) throws Exception {
-                                               return new Edge<K, 
NV>(value.f0, value.f1, mapper
-                                                               .map(value));
+                                               return new Edge<>(value.f0, 
value.f1, mapper.map(value));
                                        }
                                })
                                .returns(returnType)
                                .withForwardedFields("f0; f1");
 
-               return new Graph<K, VV, NV>(this.vertices, mappedEdges, 
this.context);
+               return new Graph<>(this.vertices, mappedEdges, this.context);
        }
 
        /**
@@ -628,7 +628,7 @@ public class Graph<K, VV, EV> {
                DataSet<Vertex<K, VV>> resultedVertices = this.getVertices()
                                .coGroup(inputDataSet).where(0).equalTo(0)
                                .with(new ApplyCoGroupToVertexValues<K, VV, 
T>(vertexJoinFunction));
-               return new Graph<K, VV, EV>(resultedVertices, this.edges, 
this.context);
+               return new Graph<>(resultedVertices, this.edges, this.context);
        }
 
        private static final class ApplyCoGroupToVertexValues<K, VV, T>
@@ -651,7 +651,7 @@ public class Graph<K, VV, EV> {
                                if (inputIterator.hasNext()) {
                                        final Tuple2<K, T> inputNext = 
inputIterator.next();
 
-                                       collector.collect(new Vertex<K, 
VV>(inputNext.f0, vertexJoinFunction
+                                       collector.collect(new 
Vertex<>(inputNext.f0, vertexJoinFunction
                                                        
.vertexJoin(vertexIterator.next().f1, inputNext.f1)));
                                } else {
                                        
collector.collect(vertexIterator.next());
@@ -681,7 +681,7 @@ public class Graph<K, VV, EV> {
                DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
                                .coGroup(inputDataSet).where(0, 1).equalTo(0, 1)
                                .with(new ApplyCoGroupToEdgeValues<K, EV, 
T>(edgeJoinFunction));
-               return new Graph<K, VV, EV>(this.vertices, resultedEdges, 
this.context);
+               return new Graph<>(this.vertices, resultedEdges, this.context);
        }
 
        private static final class ApplyCoGroupToEdgeValues<K, EV, T>
@@ -704,7 +704,7 @@ public class Graph<K, VV, EV> {
                                if (inputIterator.hasNext()) {
                                        final Tuple3<K, K, T> inputNext = 
inputIterator.next();
 
-                                       collector.collect(new Edge<K, 
EV>(inputNext.f0,
+                                       collector.collect(new 
Edge<>(inputNext.f0,
                                                        inputNext.f1, 
edgeJoinFunction.edgeJoin(
                                                                        
edgesIterator.next().f2, inputNext.f2)));
                                } else {
@@ -736,7 +736,7 @@ public class Graph<K, VV, EV> {
                                .coGroup(inputDataSet).where(0).equalTo(0)
                                .with(new 
ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction));
 
-               return new Graph<K, VV, EV>(this.vertices, resultedEdges, 
this.context);
+               return new Graph<>(this.vertices, resultedEdges, this.context);
        }
 
        private static final class 
ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>
@@ -761,7 +761,7 @@ public class Graph<K, VV, EV> {
                                while (edgesIterator.hasNext()) {
                                        Edge<K, EV> edgesNext = 
edgesIterator.next();
 
-                                       collector.collect(new Edge<K, 
EV>(edgesNext.f0,
+                                       collector.collect(new 
Edge<>(edgesNext.f0,
                                                        edgesNext.f1, 
edgeJoinFunction.edgeJoin(edgesNext.f2, inputNext.f1)));
                                }
 
@@ -795,7 +795,7 @@ public class Graph<K, VV, EV> {
                                .coGroup(inputDataSet).where(1).equalTo(0)
                                .with(new 
ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction));
 
-               return new Graph<K, VV, EV>(this.vertices, resultedEdges, 
this.context);
+               return new Graph<>(this.vertices, resultedEdges, this.context);
        }
 
        /**
@@ -817,8 +817,7 @@ public class Graph<K, VV, EV> {
 
                DataSet<Edge<K, EV>> filteredEdges = 
remainingEdges.filter(edgeFilter);
 
-               return new Graph<K, VV, EV>(filteredVertices, filteredEdges,
-                               this.context);
+               return new Graph<>(filteredVertices, filteredEdges, 
this.context);
        }
 
        /**
@@ -837,7 +836,7 @@ public class Graph<K, VV, EV> {
                                .join(filteredVertices).where(1).equalTo(0)
                                .with(new ProjectEdge<K, VV, EV>());
 
-               return new Graph<K, VV, EV>(filteredVertices, remainingEdges, 
this.context);
+               return new Graph<>(filteredVertices, remainingEdges, 
this.context);
        }
 
        /**
@@ -850,7 +849,7 @@ public class Graph<K, VV, EV> {
        public Graph<K, VV, EV> filterOnEdges(FilterFunction<Edge<K, EV>> 
edgeFilter) {
                DataSet<Edge<K, EV>> filteredEdges = 
this.edges.filter(edgeFilter);
 
-               return new Graph<K, VV, EV>(this.vertices, filteredEdges, 
this.context);
+               return new Graph<>(this.vertices, filteredEdges, this.context);
        }
 
        @ForwardedFieldsFirst("f0; f1; f2")
@@ -924,7 +923,7 @@ public class Graph<K, VV, EV> {
        public Graph<K, VV, EV> getUndirected() {
 
                DataSet<Edge<K, EV>> undirectedEdges = edges.flatMap(new 
RegularAndReversedEdgesMap<K, EV>());
-               return new Graph<K, VV, EV>(vertices, undirectedEdges, 
this.context);
+               return new Graph<>(vertices, undirectedEdges, this.context);
        }
 
        /**
@@ -947,13 +946,13 @@ public class Graph<K, VV, EV> {
                switch (direction) {
                case IN:
                        return vertices.coGroup(edges).where(0).equalTo(1)
-                                       .with(new ApplyCoGroupFunction<K, VV, 
EV, T>(edgesFunction));
+                                       .with(new 
ApplyCoGroupFunction<>(edgesFunction));
                case OUT:
                        return vertices.coGroup(edges).where(0).equalTo(0)
-                                       .with(new ApplyCoGroupFunction<K, VV, 
EV, T>(edgesFunction));
+                                       .with(new 
ApplyCoGroupFunction<>(edgesFunction));
                case ALL:
                        return vertices.coGroup(edges.flatMap(new 
EmitOneEdgePerNode<K, VV, EV>()))
-                                       .where(0).equalTo(0).with(new 
ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction));
+                                       .where(0).equalTo(0).with(new 
ApplyCoGroupFunctionOnAllEdges<>(edgesFunction));
                default:
                        throw new IllegalArgumentException("Illegal edge 
direction");
                }
@@ -980,13 +979,13 @@ public class Graph<K, VV, EV> {
                switch (direction) {
                        case IN:
                                return 
vertices.coGroup(edges).where(0).equalTo(1)
-                                               .with(new 
ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
+                                               .with(new 
ApplyCoGroupFunction<>(edgesFunction)).returns(typeInfo);
                        case OUT:
                                return 
vertices.coGroup(edges).where(0).equalTo(0)
-                                               .with(new 
ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
+                                               .with(new 
ApplyCoGroupFunction<>(edgesFunction)).returns(typeInfo);
                        case ALL:
                                return vertices.coGroup(edges.flatMap(new 
EmitOneEdgePerNode<K, VV, EV>()))
-                                               .where(0).equalTo(0).with(new 
ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
+                                               .where(0).equalTo(0).with(new 
ApplyCoGroupFunctionOnAllEdges<>(edgesFunction)).returns(typeInfo);
                        default:
                                throw new IllegalArgumentException("Illegal 
edge direction");
                }
@@ -1013,14 +1012,14 @@ public class Graph<K, VV, EV> {
                case IN:
                        return edges.map(new ProjectVertexIdMap<K, EV>(1))
                                        .withForwardedFields("f1->f0")
-                                       .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
+                                       .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<>(edgesFunction));
                case OUT:
                        return edges.map(new ProjectVertexIdMap<K, EV>(0))
                                        .withForwardedFields("f0")
-                                       .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
+                                       .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<>(edgesFunction));
                case ALL:
                        return edges.flatMap(new EmitOneEdgePerNode<K, VV, 
EV>())
-                                       .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
+                                       .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<>(edgesFunction));
                default:
                        throw new IllegalArgumentException("Illegal edge 
direction");
                }
@@ -1048,14 +1047,14 @@ public class Graph<K, VV, EV> {
                        case IN:
                                return edges.map(new ProjectVertexIdMap<K, 
EV>(1))
                                                .withForwardedFields("f1->f0")
-                                               .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
+                                               .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo);
                        case OUT:
                                return edges.map(new ProjectVertexIdMap<K, 
EV>(0))
                                                .withForwardedFields("f0")
-                                               .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
+                                               .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo);
                        case ALL:
                                return edges.flatMap(new EmitOneEdgePerNode<K, 
VV, EV>())
-                                               .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
+                                               .groupBy(0).reduceGroup(new 
ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo);
                        default:
                                throw new IllegalArgumentException("Illegal 
edge direction");
                }
@@ -1072,7 +1071,7 @@ public class Graph<K, VV, EV> {
 
                @SuppressWarnings("unchecked")
                public Tuple2<K, Edge<K, EV>> map(Edge<K, EV> edge) {
-                       return new Tuple2<K, Edge<K, EV>>((K) 
edge.getField(fieldPosition),     edge);
+                       return new Tuple2<>((K) edge.getField(fieldPosition), 
edge);
                }
        }
 
@@ -1087,7 +1086,7 @@ public class Graph<K, VV, EV> {
 
                @SuppressWarnings("unchecked")
                public Tuple2<K, EV> map(Edge<K, EV> edge) {
-                       return new Tuple2<K, EV>((K) 
edge.getField(fieldPosition),      edge.getValue());
+                       return new Tuple2<>((K) edge.getField(fieldPosition), 
edge.getValue());
                }
        }
 
@@ -1114,8 +1113,8 @@ public class Graph<K, VV, EV> {
                Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
 
                public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, 
Edge<K, EV>>> out) {
-                       out.collect(new Tuple2<K, Edge<K, 
EV>>(edge.getSource(), edge));
-                       out.collect(new Tuple2<K, Edge<K, 
EV>>(edge.getTarget(), edge));
+                       out.collect(new Tuple2<>(edge.getSource(), edge));
+                       out.collect(new Tuple2<>(edge.getTarget(), edge));
                }
        }
 
@@ -1123,8 +1122,8 @@ public class Graph<K, VV, EV> {
                Edge<K, EV>, Tuple2<K, EV>> {
 
                public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, EV>> 
out) {
-                       out.collect(new Tuple2<K, EV>(edge.getSource(), 
edge.getValue()));
-                       out.collect(new Tuple2<K, EV>(edge.getTarget(), 
edge.getValue()));
+                       out.collect(new Tuple2<>(edge.getSource(), 
edge.getValue()));
+                       out.collect(new Tuple2<>(edge.getTarget(), 
edge.getValue()));
                }
        }
 
@@ -1132,8 +1131,8 @@ public class Graph<K, VV, EV> {
                Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> {
 
                public void flatMap(Edge<K, EV> edge, Collector<Tuple3<K, K, 
Edge<K, EV>>> out) {
-                       out.collect(new Tuple3<K, K, Edge<K, 
EV>>(edge.getSource(), edge.getTarget(), edge));
-                       out.collect(new Tuple3<K, K, Edge<K, 
EV>>(edge.getTarget(), edge.getSource(), edge));
+                       out.collect(new Tuple3<>(edge.getSource(), 
edge.getTarget(), edge));
+                       out.collect(new Tuple3<>(edge.getTarget(), 
edge.getSource(), edge));
                }
        }
 
@@ -1224,7 +1223,7 @@ public class Graph<K, VV, EV> {
                        implements MapFunction<Edge<K, EV>, Edge<K, EV>> {
 
                public Edge<K, EV> map(Edge<K, EV> value) {
-                       return new Edge<K, EV>(value.f1, value.f0, value.f2);
+                       return new Edge<>(value.f1, value.f0, value.f2);
                }
        }
 
@@ -1233,8 +1232,8 @@ public class Graph<K, VV, EV> {
 
                @Override
                public void flatMap(Edge<K, EV> edge, Collector<Edge<K, EV>> 
out) throws Exception {
-                       out.collect(new Edge<K, EV>(edge.f0, edge.f1, edge.f2));
-                       out.collect(new Edge<K, EV>(edge.f1, edge.f0, edge.f2));
+                       out.collect(new Edge<>(edge.f0, edge.f1, edge.f2));
+                       out.collect(new Edge<>(edge.f1, edge.f0, edge.f2));
                }
        }
 
@@ -1246,7 +1245,7 @@ public class Graph<K, VV, EV> {
         */
        public Graph<K, VV, EV> reverse() throws UnsupportedOperationException {
                DataSet<Edge<K, EV>> reversedEdges = edges.map(new 
ReverseEdgesMap<K, EV>());
-               return new Graph<K, VV, EV>(vertices, reversedEdges, 
this.context);
+               return new Graph<>(vertices, reversedEdges, this.context);
        }
 
        /**
@@ -1290,7 +1289,7 @@ public class Graph<K, VV, EV> {
                        implements MapFunction<Edge<K, EV>, Tuple2<K, K>> {
                @Override
                public Tuple2<K, K> map(Edge<K, EV> edge) throws Exception {
-                       return new Tuple2<K, K>(edge.f0, edge.f1);
+                       return new Tuple2<>(edge.f0, edge.f1);
                }
        }
 
@@ -1302,7 +1301,7 @@ public class Graph<K, VV, EV> {
         * @return the new graph containing the existing vertices as well as 
the one just added
         */
        public Graph<K, VV, EV> addVertex(final Vertex<K, VV> vertex) {
-               List<Vertex<K, VV>> newVertex = new ArrayList<Vertex<K, VV>>();
+               List<Vertex<K, VV>> newVertex = new ArrayList<>();
                newVertex.add(vertex);
 
                return addVertices(newVertex);
@@ -1353,7 +1352,7 @@ public class Graph<K, VV, EV> {
         */
        public Graph<K, VV, EV> addEdge(Vertex<K, VV> source, Vertex<K, VV> 
target, EV edgeValue) {
                Graph<K, VV, EV> partialGraph = 
fromCollection(Arrays.asList(source, target),
-                               Arrays.asList(new Edge<K, EV>(source.f0, 
target.f0, edgeValue)),
+                               Collections.singletonList(new Edge<>(source.f0, 
target.f0, edgeValue)),
                                this.context);
                return this.union(partialGraph);
        }
@@ -1408,7 +1407,7 @@ public class Graph<K, VV, EV> {
         */
        public Graph<K, VV, EV> removeVertex(Vertex<K, VV> vertex) {
 
-               List<Vertex<K, VV>> vertexToBeRemoved = new ArrayList<Vertex<K, 
VV>>();
+               List<Vertex<K, VV>> vertexToBeRemoved = new ArrayList<>();
                vertexToBeRemoved.add(vertex);
 
                return removeVertices(vertexToBeRemoved);
@@ -1445,7 +1444,7 @@ public class Graph<K, VV, EV> {
                                .join(newVertices).where(1).equalTo(0)
                                .with(new ProjectEdge<K, VV, EV>());
 
-               return new Graph<K, VV, EV>(newVertices, newEdges, context);
+               return new Graph<>(newVertices, newEdges, context);
        }
 
        private static final class VerticesRemovalCoGroup<K, VV> implements 
CoGroupFunction<Vertex<K, VV>, Vertex<K, VV>, Vertex<K, VV>> {
@@ -1515,7 +1514,7 @@ public class Graph<K, VV, EV> {
                DataSet<Edge<K, EV>> newEdges = 
getEdges().coGroup(this.context.fromCollection(edgesToBeRemoved))
                                .where(0,1).equalTo(0,1).with(new 
EdgeRemovalCoGroup<K, EV>());
 
-               return new Graph<K, VV, EV>(this.vertices, newEdges, context);
+               return new Graph<>(this.vertices, newEdges, context);
        }
 
        private static final class EdgeRemovalCoGroup<K,EV> implements 
CoGroupFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>> {
@@ -1541,7 +1540,7 @@ public class Graph<K, VV, EV> {
        public Graph<K, VV, EV> union(Graph<K, VV, EV> graph) {
                DataSet<Vertex<K, VV>> unionedVertices = 
graph.getVertices().union(this.getVertices()).distinct();
                DataSet<Edge<K, EV>> unionedEdges = 
graph.getEdges().union(this.getEdges());
-               return new Graph<K, VV, EV>(unionedVertices, unionedEdges, 
this.context);
+               return new Graph<>(unionedVertices, unionedEdges, this.context);
        }
 
        /**
@@ -1689,7 +1688,7 @@ public class Graph<K, VV, EV> {
 
                DataSet<Vertex<K, VV>> newVertices = 
this.getVertices().runOperation(iteration);
 
-               return new Graph<K, VV, EV>(newVertices, this.edges, 
this.context);
+               return new Graph<>(newVertices, this.edges, this.context);
        }
 
        /**
@@ -1706,7 +1705,7 @@ public class Graph<K, VV, EV> {
         * after maximumNumberOfIterations.
         */
        public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
-                       org.apache.flink.graph.gsa.GatherFunction 
gatherFunction, SumFunction<VV, EV, M> sumFunction,
+                       org.apache.flink.graph.gsa.GatherFunction<VV, EV, M> 
gatherFunction, SumFunction<VV, EV, M> sumFunction,
                        ApplyFunction<K, VV, M> applyFunction, int 
maximumNumberOfIterations) {
 
                return this.runGatherSumApplyIteration(gatherFunction, 
sumFunction, applyFunction,
@@ -1727,7 +1726,7 @@ public class Graph<K, VV, EV> {
         * after maximumNumberOfIterations.
         */
        public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
-                       org.apache.flink.graph.gsa.GatherFunction 
gatherFunction, SumFunction<VV, EV, M> sumFunction,
+                       org.apache.flink.graph.gsa.GatherFunction<VV, EV, M> 
gatherFunction, SumFunction<VV, EV, M> sumFunction,
                        ApplyFunction<K, VV, M> applyFunction, int 
maximumNumberOfIterations,
                        GSAConfiguration parameters) {
 
@@ -1738,7 +1737,7 @@ public class Graph<K, VV, EV> {
 
                DataSet<Vertex<K, VV>> newVertices = 
vertices.runOperation(iteration);
 
-               return new Graph<K, VV, EV>(newVertices, this.edges, 
this.context);
+               return new Graph<>(newVertices, this.edges, this.context);
        }
 
        /**
@@ -1776,10 +1775,10 @@ public class Graph<K, VV, EV> {
                        VertexCentricConfiguration parameters) {
 
                VertexCentricIteration<K, VV, EV, M> iteration = 
VertexCentricIteration.withEdges(
-                               edges, computeFunction, 
maximumNumberOfIterations);
+                               edges, computeFunction, combiner, 
maximumNumberOfIterations);
                iteration.configure(parameters);
                DataSet<Vertex<K, VV>> newVertices = 
this.getVertices().runOperation(iteration);
-               return new Graph<K, VV, EV>(newVertices, this.edges, 
this.context);
+               return new Graph<>(newVertices, this.edges, this.context);
        }
 
        /**
@@ -1831,14 +1830,14 @@ public class Graph<K, VV, EV> {
                                        
.join(this.vertices).where(0).equalTo(0);
                        return vertices.coGroup(edgesWithSources)
                                        .where(0).equalTo("f0.f1")
-                                       .with(new 
ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction));
+                                       .with(new 
ApplyNeighborCoGroupFunction<>(neighborsFunction));
                case OUT:
                        // create <edge-targetVertex> pairs
                        DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> 
edgesWithTargets = edges
                                        
.join(this.vertices).where(1).equalTo(0);
                        return vertices.coGroup(edgesWithTargets)
                                        .where(0).equalTo("f0.f0")
-                                       .with(new 
ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction));
+                                       .with(new 
ApplyNeighborCoGroupFunction<>(neighborsFunction));
                case ALL:
                        // create <edge-sourceOrTargetVertex> pairs
                        DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> 
edgesWithNeighbors = edges
@@ -1848,7 +1847,7 @@ public class Graph<K, VV, EV> {
 
                        return vertices.coGroup(edgesWithNeighbors)
                                        .where(0).equalTo(0)
-                                       .with(new 
ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction));
+                                       .with(new 
ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction));
                default:
                        throw new IllegalArgumentException("Illegal edge 
direction");
                }
@@ -1879,14 +1878,14 @@ public class Graph<K, VV, EV> {
                                                
.join(this.vertices).where(0).equalTo(0);
                                return vertices.coGroup(edgesWithSources)
                                                .where(0).equalTo("f0.f1")
-                                               .with(new 
ApplyNeighborCoGroupFunction<K, VV, EV, 
T>(neighborsFunction)).returns(typeInfo);
+                                               .with(new 
ApplyNeighborCoGroupFunction<>(neighborsFunction)).returns(typeInfo);
                        case OUT:
                                // create <edge-targetVertex> pairs
                                DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> 
edgesWithTargets = edges
                                                
.join(this.vertices).where(1).equalTo(0);
                                return vertices.coGroup(edgesWithTargets)
                                                .where(0).equalTo("f0.f0")
-                                               .with(new 
ApplyNeighborCoGroupFunction<K, VV, EV, 
T>(neighborsFunction)).returns(typeInfo);
+                                               .with(new 
ApplyNeighborCoGroupFunction<>(neighborsFunction)).returns(typeInfo);
                        case ALL:
                                // create <edge-sourceOrTargetVertex> pairs
                                DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> 
edgesWithNeighbors = edges
@@ -1896,7 +1895,7 @@ public class Graph<K, VV, EV> {
 
                                return vertices.coGroup(edgesWithNeighbors)
                                                .where(0).equalTo(0)
-                                               .with(new 
ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, 
T>(neighborsFunction)).returns(typeInfo);
+                                               .with(new 
ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction)).returns(typeInfo);
                        default:
                                throw new IllegalArgumentException("Illegal 
edge direction");
                }
@@ -1927,7 +1926,7 @@ public class Graph<K, VV, EV> {
                                        .with(new ProjectVertexIdJoin<K, VV, 
EV>(1))
                                        .withForwardedFieldsFirst("f1->f0");
                        return edgesWithSources.groupBy(0).reduceGroup(
-                                       new ApplyNeighborGroupReduceFunction<K, 
VV, EV, T>(neighborsFunction));
+                               new 
ApplyNeighborGroupReduceFunction<>(neighborsFunction));
                case OUT:
                        // create <edge-targetVertex> pairs
                        DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> 
edgesWithTargets = edges
@@ -1935,7 +1934,7 @@ public class Graph<K, VV, EV> {
                                        .with(new ProjectVertexIdJoin<K, VV, 
EV>(0))
                                        .withForwardedFieldsFirst("f0");
                        return edgesWithTargets.groupBy(0).reduceGroup(
-                                       new ApplyNeighborGroupReduceFunction<K, 
VV, EV, T>(neighborsFunction));
+                               new 
ApplyNeighborGroupReduceFunction<>(neighborsFunction));
                case ALL:
                        // create <edge-sourceOrTargetVertex> pairs
                        DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> 
edgesWithNeighbors = edges
@@ -1944,7 +1943,7 @@ public class Graph<K, VV, EV> {
                                        .with(new ProjectEdgeWithNeighbor<K, 
VV, EV>());
 
                        return edgesWithNeighbors.groupBy(0).reduceGroup(
-                                       new ApplyNeighborGroupReduceFunction<K, 
VV, EV, T>(neighborsFunction));
+                               new 
ApplyNeighborGroupReduceFunction<>(neighborsFunction));
                default:
                        throw new IllegalArgumentException("Illegal edge 
direction");
                }
@@ -1976,7 +1975,7 @@ public class Graph<K, VV, EV> {
                                                .with(new 
ProjectVertexIdJoin<K, VV, EV>(1))
                                                
.withForwardedFieldsFirst("f1->f0");
                                return edgesWithSources.groupBy(0).reduceGroup(
-                                               new 
ApplyNeighborGroupReduceFunction<K, VV, EV, 
T>(neighborsFunction)).returns(typeInfo);
+                                       new 
ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo);
                        case OUT:
                                // create <edge-targetVertex> pairs
                                DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> 
edgesWithTargets = edges
@@ -1984,7 +1983,7 @@ public class Graph<K, VV, EV> {
                                                .with(new 
ProjectVertexIdJoin<K, VV, EV>(0))
                                                .withForwardedFieldsFirst("f0");
                                return edgesWithTargets.groupBy(0).reduceGroup(
-                                               new 
ApplyNeighborGroupReduceFunction<K, VV, EV, 
T>(neighborsFunction)).returns(typeInfo);
+                                       new 
ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo);
                        case ALL:
                                // create <edge-sourceOrTargetVertex> pairs
                                DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> 
edgesWithNeighbors = edges
@@ -1993,7 +1992,7 @@ public class Graph<K, VV, EV> {
                                                .with(new 
ProjectEdgeWithNeighbor<K, VV, EV>());
 
                                return 
edgesWithNeighbors.groupBy(0).reduceGroup(
-                                               new 
ApplyNeighborGroupReduceFunction<K, VV, EV, 
T>(neighborsFunction)).returns(typeInfo);
+                                       new 
ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo);
                        default:
                                throw new IllegalArgumentException("Illegal 
edge direction");
                }
@@ -2031,7 +2030,7 @@ public class Graph<K, VV, EV> {
                @SuppressWarnings("unchecked")
                public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex, 
                                Collector<Tuple2<K, VV>> out) {
-                       out.collect(new Tuple2<K, VV>((K) 
edge.getField(fieldPosition), otherVertex.getValue()));
+                       out.collect(new Tuple2<>((K) 
edge.getField(fieldPosition), otherVertex.getValue()));
                }
        }
 
@@ -2047,7 +2046,7 @@ public class Graph<K, VV, EV> {
                @SuppressWarnings("unchecked")
                public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex,
                                                Collector<Tuple3<K, Edge<K, 
EV>, Vertex<K, VV>>> out) {
-                       out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, 
VV>>((K) edge.getField(fieldPosition), edge, otherVertex));
+                       out.collect(new Tuple3<>((K) 
edge.getField(fieldPosition), edge, otherVertex));
                }
        }
 
@@ -2059,7 +2058,7 @@ public class Graph<K, VV, EV> {
                public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, 
Vertex<K, VV> neighbor,
                                Collector<Tuple2<K, VV>> out) {
 
-                       out.collect(new Tuple2<K, VV>(keysWithEdge.f0, 
neighbor.getValue()));
+                       out.collect(new Tuple2<>(keysWithEdge.f0, 
neighbor.getValue()));
                }
        }
 
@@ -2070,7 +2069,7 @@ public class Graph<K, VV, EV> {
 
                public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, 
Vertex<K, VV> neighbor,
                                                Collector<Tuple3<K, Edge<K, 
EV>, Vertex<K, VV>>> out) {
-                       out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, 
VV>>(keysWithEdge.f0, keysWithEdge.f2, neighbor));
+                       out.collect(new Tuple3<>(keysWithEdge.f0, 
keysWithEdge.f2, neighbor));
                }
        }
 
@@ -2119,7 +2118,7 @@ public class Graph<K, VV, EV> {
                                @Override
                                public Tuple2<Edge<K, EV>, Vertex<K, VV>> 
next() {
                                        Tuple3<K, Edge<K, EV>, Vertex<K, VV>> 
next = keysWithEdgesIterator.next();
-                                       return new Tuple2<Edge<K, EV>, 
Vertex<K, VV>>(next.f1, next.f2);
+                                       return new Tuple2<>(next.f1, next.f2);
                                }
 
                                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
index 08cf011..8ec9650 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
@@ -26,5 +26,5 @@ package org.apache.flink.graph;
  */
 public interface GraphAlgorithm<K, VV, EV, T> {
 
-       public T run(Graph<K, VV, EV> input) throws Exception;
+       T run(Graph<K, VV, EV> input) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
index f93abc4..4859e12 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
@@ -104,7 +104,7 @@ public class GraphCsvReader {
        public <K, VV, EV> Graph<K, VV, EV> types(Class<K> vertexKey, Class<VV> 
vertexValue,
                        Class<EV> edgeValue) {
 
-               DataSet<Tuple2<K, VV>> vertices = null;
+               DataSet<Tuple2<K, VV>> vertices;
 
                if (edgeReader == null) {
                        throw new RuntimeException("The edges input file cannot 
be null!");
@@ -160,9 +160,9 @@ public class GraphCsvReader {
                                        private static final long 
serialVersionUID = -2981792951286476970L;
 
                                        public Tuple3<K, K, NullValue> 
map(Tuple2<K, K> edge) {
-                                               return new Tuple3<K, K, 
NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+                                               return new Tuple3<>(edge.f0, 
edge.f1, NullValue.getInstance());
                                        }
-                               }).withForwardedFields("f0;f1");;
+                               }).withForwardedFields("f0;f1");
 
                return Graph.fromTupleDataSet(edges, executionContext);
        }
@@ -179,7 +179,7 @@ public class GraphCsvReader {
        @SuppressWarnings({ "serial", "unchecked" })
        public <K, VV> Graph<K, VV, NullValue> vertexTypes(Class<K> vertexKey, 
Class<VV> vertexValue) {
                
-               DataSet<Tuple2<K, VV>> vertices = null;
+               DataSet<Tuple2<K, VV>> vertices;
 
                if (edgeReader == null) {
                        throw new RuntimeException("The edges input file cannot 
be null!");
@@ -189,7 +189,7 @@ public class GraphCsvReader {
                                .map(new MapFunction<Tuple2<K,K>, Tuple3<K, K, 
NullValue>>() {
 
                                        public Tuple3<K, K, NullValue> 
map(Tuple2<K, K> input) {
-                                               return new Tuple3<K, K, 
NullValue>(input.f0, input.f1, NullValue.getInstance());
+                                               return new Tuple3<>(input.f0, 
input.f1, NullValue.getInstance());
                                        }
                                }).withForwardedFields("f0;f1");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
index 0b98a27..964d20e 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
@@ -37,7 +37,7 @@ public abstract class IterationConfiguration {
        private int parallelism = -1;
 
        /** the iteration aggregators **/
-       private Map<String, Aggregator<?>> aggregators = new HashMap<String, 
Aggregator<?>>();
+       private Map<String, Aggregator<?>> aggregators = new HashMap<>();
 
        /** flag that defines whether the solution set is kept in managed 
memory **/
        private boolean unmanagedSolutionSet = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
index dee3480..2ae0903 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
@@ -64,14 +64,14 @@ public class Triplet <K, VV, EV> extends Tuple5<K, K, VV, 
VV, EV> {
        }
 
        public Vertex<K, VV> getSrcVertex() {
-               return new Vertex<K, VV>(this.f0, this.f2);
+               return new Vertex<>(this.f0, this.f2);
        }
 
        public Vertex<K, VV> getTrgVertex() {
-               return new Vertex<K, VV>(this.f1, this.f3);
+               return new Vertex<>(this.f1, this.f3);
        }
 
        public Edge<K, EV> getEdge() {
-               return new Edge<K, EV>(this.f0, this.f1, this.f4);
+               return new Edge<>(this.f0, this.f1, this.f4);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
index a30d1a2..f40dac9 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.graph;
 
-import java.io.Serializable;
-
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.DataSet;
+
+import java.io.Serializable;
 
 /**
  * Interface to be implemented by the transformation function

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
index e9add7c..f05c254 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -105,7 +105,7 @@ public abstract class ApplyFunction<K, VV, M> implements 
Serializable {
         * @return The aggregator registered under this name, or null, if no 
aggregator was registered.
         */
        public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-               return this.runtimeContext.<T>getIterationAggregator(name);
+               return this.runtimeContext.getIterationAggregator(name);
        }
 
        /**
@@ -115,7 +115,7 @@ public abstract class ApplyFunction<K, VV, M> implements 
Serializable {
         * @return The aggregated value of the previous iteration.
         */
        public <T extends Value> T getPreviousIterationAggregate(String name) {
-               return 
this.runtimeContext.<T>getPreviousIterationAggregate(name);
+               return this.runtimeContext.getPreviousIterationAggregate(name);
        }
 
        /**
@@ -126,7 +126,7 @@ public abstract class ApplyFunction<K, VV, M> implements 
Serializable {
         * @return The broadcast data set.
         */
        public <T> Collection<T> getBroadcastSet(String name) {
-               return this.runtimeContext.<T>getBroadcastVariable(name);
+               return this.runtimeContext.getBroadcastVariable(name);
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
index 8d24f16..079b4c7 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java
@@ -39,13 +39,13 @@ import java.util.List;
 public class GSAConfiguration extends IterationConfiguration {
 
        /** the broadcast variables for the gather function **/
-       private List<Tuple2<String, DataSet<?>>> bcVarsGather = new 
ArrayList<Tuple2<String,DataSet<?>>>();
+       private List<Tuple2<String, DataSet<?>>> bcVarsGather = new 
ArrayList<>();
 
        /** the broadcast variables for the sum function **/
-       private List<Tuple2<String, DataSet<?>>> bcVarsSum = new 
ArrayList<Tuple2<String,DataSet<?>>>();
+       private List<Tuple2<String, DataSet<?>>> bcVarsSum = new ArrayList<>();
 
        /** the broadcast variables for the apply function **/
-       private List<Tuple2<String, DataSet<?>>> bcVarsApply = new 
ArrayList<Tuple2<String,DataSet<?>>>();
+       private List<Tuple2<String, DataSet<?>>> bcVarsApply = new 
ArrayList<>();
 
        private EdgeDirection direction = EdgeDirection.OUT;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
index d914f2a..90db9da 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
@@ -94,7 +94,7 @@ public abstract class GatherFunction<VV, EV, M> implements 
Serializable {
         * @return The aggregator registered under this name, or null, if no 
aggregator was registered.
         */
        public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-               return this.runtimeContext.<T>getIterationAggregator(name);
+               return this.runtimeContext.getIterationAggregator(name);
        }
 
        /**
@@ -104,7 +104,7 @@ public abstract class GatherFunction<VV, EV, M> implements 
Serializable {
         * @return The aggregated value of the previous iteration.
         */
        public <T extends Value> T getPreviousIterationAggregate(String name) {
-               return 
this.runtimeContext.<T>getPreviousIterationAggregate(name);
+               return this.runtimeContext.getPreviousIterationAggregate(name);
        }
 
        /**
@@ -115,7 +115,7 @@ public abstract class GatherFunction<VV, EV, M> implements 
Serializable {
         * @return The broadcast data set.
         */
        public <T> Collection<T> getBroadcastSet(String name) {
-               return this.runtimeContext.<T>getBroadcastVariable(name);
+               return this.runtimeContext.getBroadcastVariable(name);
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index d1b12f9..442ce68 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -39,7 +39,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.utils.GraphUtils;
 import org.apache.flink.types.LongValue;
@@ -119,13 +118,9 @@ public class GatherSumApplyIteration<K, VV, EV, M> 
implements CustomUnaryOperati
                // Prepare type information
                TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
vertexDataSet.getType()).getTypeAt(0);
                TypeInformation<M> messageType = 
TypeExtractor.createTypeInfo(gather, GatherFunction.class, gather.getClass(), 
2);
-               TypeInformation<Tuple2<K, M>> innerType = new 
TupleTypeInfo<Tuple2<K, M>>(keyType, messageType);
+               TypeInformation<Tuple2<K, M>> innerType = new 
TupleTypeInfo<>(keyType, messageType);
                TypeInformation<Vertex<K, VV>> outputType = 
vertexDataSet.getType();
 
-               // create a graph
-               Graph<K, VV, EV> graph =
-                               Graph.fromDataSet(vertexDataSet, edgeDataSet, 
vertexDataSet.getExecutionEnvironment());
-
                // check whether the numVertices option is set and, if so, 
compute the total number of vertices
                // and set it within the gather, sum and apply functions
 
@@ -139,9 +134,9 @@ public class GatherSumApplyIteration<K, VV, EV, M> 
implements CustomUnaryOperati
                }
 
                // Prepare UDFs
-               GatherUdf<K, VV, EV, M> gatherUdf = new GatherUdf<K, VV, EV, 
M>(gather, innerType);
-               SumUdf<K, VV, EV, M> sumUdf = new SumUdf<K, VV, EV, M>(sum, 
innerType);
-               ApplyUdf<K, VV, EV, M> applyUdf = new ApplyUdf<K, VV, EV, 
M>(apply, outputType);
+               GatherUdf<K, VV, EV, M> gatherUdf = new GatherUdf<>(gather, 
innerType);
+               SumUdf<K, VV, EV, M> sumUdf = new SumUdf<>(sum, innerType);
+               ApplyUdf<K, VV, EV, M> applyUdf = new ApplyUdf<>(apply, 
outputType);
 
                final int[] zeroKeyPos = new int[] {0};
                final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
@@ -268,11 +263,11 @@ public class GatherSumApplyIteration<K, VV, EV, M> 
implements CustomUnaryOperati
         *
         * @return An in stance of the gather-sum-apply graph computation 
operator.
         */
-       public static final <K, VV, EV, M> GatherSumApplyIteration<K, VV, EV, M>
+       public static <K, VV, EV, M> GatherSumApplyIteration<K, VV, EV, M>
                withEdges(DataSet<Edge<K, EV>> edges, GatherFunction<VV, EV, M> 
gather,
                SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply, int 
maximumNumberOfIterations) {
 
-               return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, 
apply, edges, maximumNumberOfIterations);
+               return new GatherSumApplyIteration<>(gather, sum, apply, edges, 
maximumNumberOfIterations);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -295,7 +290,7 @@ public class GatherSumApplyIteration<K, VV, EV, M> 
implements CustomUnaryOperati
                @Override
                public Tuple2<K, M> map(Tuple2<K, Neighbor<VV, EV>> 
neighborTuple) {
                        M result = this.gatherFunction.gather(neighborTuple.f1);
-                       return new Tuple2<K, M>(neighborTuple.f0, result);
+                       return new Tuple2<>(neighborTuple.f0, result);
                }
 
                @Override
@@ -337,7 +332,7 @@ public class GatherSumApplyIteration<K, VV, EV, M> 
implements CustomUnaryOperati
                public Tuple2<K, M> reduce(Tuple2<K, M> arg0, Tuple2<K, M> 
arg1) throws Exception {
                        K key = arg0.f0;
                        M result = this.sumFunction.sum(arg0.f1, arg1.f1);
-                       return new Tuple2<K, M>(key, result);
+                       return new Tuple2<>(key, result);
                }
 
                @Override
@@ -411,8 +406,8 @@ public class GatherSumApplyIteration<K, VV, EV, M> 
implements CustomUnaryOperati
                        Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, 
EV>>> {
 
                public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, 
Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
-                       out.collect(new Tuple2<K, Neighbor<VV, EV>>(
-                                       edge.getTarget(), new Neighbor<VV, 
EV>(vertex.getValue(), edge.getValue())));
+                       out.collect(new Tuple2<>(
+                                       edge.getTarget(), new 
Neighbor<>(vertex.getValue(), edge.getValue())));
                }
        }
 
@@ -422,8 +417,8 @@ public class GatherSumApplyIteration<K, VV, EV, M> 
implements CustomUnaryOperati
                        Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, 
EV>>> {
 
                public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, 
Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
-                       out.collect(new Tuple2<K, Neighbor<VV, EV>>(
-                                       edge.getSource(), new Neighbor<VV, 
EV>(vertex.getValue(), edge.getValue())));
+                       out.collect(new Tuple2<>(
+                                       edge.getSource(), new 
Neighbor<>(vertex.getValue(), edge.getValue())));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
index 68e8d27..e70af1f 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
@@ -94,7 +94,7 @@ public abstract class SumFunction<VV, EV, M> implements 
Serializable {
         * @return The aggregator registered under this name, or null, if no 
aggregator was registered.
         */
        public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-               return this.runtimeContext.<T>getIterationAggregator(name);
+               return this.runtimeContext.getIterationAggregator(name);
        }
 
        /**
@@ -104,7 +104,7 @@ public abstract class SumFunction<VV, EV, M> implements 
Serializable {
         * @return The aggregated value of the previous iteration.
         */
        public <T extends Value> T getPreviousIterationAggregate(String name) {
-               return 
this.runtimeContext.<T>getPreviousIterationAggregate(name);
+               return this.runtimeContext.getPreviousIterationAggregate(name);
        }
 
        /**
@@ -115,7 +115,7 @@ public abstract class SumFunction<VV, EV, M> implements 
Serializable {
         * @return The broadcast data set.
         */
        public <T> Collection<T> getBroadcastSet(String name) {
-               return this.runtimeContext.<T>getBroadcastVariable(name);
+               return this.runtimeContext.getBroadcastVariable(name);
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
index f39d858..4656757 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
@@ -51,7 +51,7 @@ public class GSASingleSourceShortestPaths<K> implements
        @Override
        public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
 
-               return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
+               return input.mapVertices(new InitVerticesMapper<>(srcVertexId))
                                .runGatherSumApplyIteration(new 
CalculateDistances(), new ChooseMinDistance(),
                                                new UpdateDistance<K>(), 
maxIterations)
                                                .getVertices();
@@ -85,7 +85,7 @@ public class GSASingleSourceShortestPaths<K> implements
                public Double gather(Neighbor<Double, Double> neighbor) {
                        return neighbor.getNeighborValue() + 
neighbor.getEdgeValue();
                }
-       };
+       }
 
        @SuppressWarnings("serial")
        private static final class ChooseMinDistance extends 
SumFunction<Double, Double, Double> {
@@ -93,7 +93,7 @@ public class GSASingleSourceShortestPaths<K> implements
                public Double sum(Double newValue, Double currentValue) {
                        return Math.min(newValue, currentValue);
                }
-       };
+       }
 
        @SuppressWarnings("serial")
        private static final class UpdateDistance<K> extends ApplyFunction<K, 
Double, Double> {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index 2d13dfd..96e5afc 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -112,7 +112,7 @@ public class LabelPropagation<K, VV extends Comparable<VV>, 
EV>
        public static final class UpdateVertexLabel<K, VV extends 
Comparable<VV>> extends GatherFunction<K, VV, VV> {
 
                public void updateVertex(Vertex<K, VV> vertex, 
MessageIterator<VV> inMessages) {
-                       Map<VV, Long> labelsWithFrequencies = new HashMap<VV, 
Long>();
+                       Map<VV, Long> labelsWithFrequencies = new HashMap<>();
 
                        long maxFrequency = 1;
                        VV mostFrequentLabel = vertex.getValue();

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 4ff4e79..5aa1ac1 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -51,7 +51,7 @@ public class SingleSourceShortestPaths<K> implements 
GraphAlgorithm<K, Double, D
        @Override
        public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
 
-               return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
+               return input.mapVertices(new InitVerticesMapper<>(srcVertexId))
                                .runScatterGatherIteration(new 
MinDistanceMessenger<K>(), new VertexDistanceUpdater<K>(),
                                maxIterations).getVertices();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
index 681d060..dabeb06 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -110,7 +110,7 @@ public class TriangleEnumerator<K extends Comparable<K>, 
VV, EV> implements
        private static final class DegreeCounter<K extends Comparable<K>, EV>
                        implements GroupReduceFunction<Edge<K, EV>, 
EdgeWithDegrees<K>> {
 
-               final ArrayList<K> otherVertices = new ArrayList<K>();
+               final ArrayList<K> otherVertices = new ArrayList<>();
                final EdgeWithDegrees<K> outputEdge = new EdgeWithDegrees<>();
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
index db64f63..08c15e9 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.graph.pregel;
 
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.java.DataSet;
@@ -33,6 +29,10 @@ import org.apache.flink.types.Either;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
 /**
  * The base class for the message-passing functions between vertices as a part 
of a {@link VertexCentricIteration}.
  * 
@@ -86,7 +86,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> 
implements Serializabl
         */
        public final Iterable<Edge<K, EV>> getEdges() {
                verifyEdgeUsage();
-               this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
+               this.edgeIterator.set(edges);
                return this.edgeIterator;
        }
 
@@ -100,7 +100,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> 
implements Serializabl
                verifyEdgeUsage();
                outMsg.setField(m, 1);
                while (edges.hasNext()) {
-                       Tuple next = (Tuple) edges.next();
+                       Tuple next = edges.next();
                        outMsg.setField(next.getField(1), 0);
                        out.collect(Either.Right(outMsg));
                }
@@ -157,7 +157,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> 
implements Serializabl
         * @return The aggregator registered under this name, or {@code null}, 
if no aggregator was registered.
         */
        public final <T extends Aggregator<?>> T getIterationAggregator(String 
name) {
-               return this.runtimeContext.<T>getIterationAggregator(name);
+               return this.runtimeContext.getIterationAggregator(name);
        }
        
        /**
@@ -167,7 +167,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> 
implements Serializabl
         * @return The aggregated value of the previous iteration.
         */
        public final <T extends Value> T getPreviousIterationAggregate(String 
name) {
-               return 
this.runtimeContext.<T>getPreviousIterationAggregate(name);
+               return this.runtimeContext.getPreviousIterationAggregate(name);
        }
        
        /**
@@ -179,7 +179,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> 
implements Serializabl
         * @return The broadcast data set.
         */
        public final <T> Collection<T> getBroadcastSet(String name) {
-               return this.runtimeContext.<T>getBroadcastVariable(name);
+               return this.runtimeContext.getBroadcastVariable(name);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -204,9 +204,9 @@ public abstract class ComputeFunction<K, VV, EV, Message> 
implements Serializabl
        
        void init(IterationRuntimeContext context) {
                this.runtimeContext = context;
-               this.outVertex = new Vertex<K, VV>();
-               this.outMsg = new Tuple2<K, Message>();
-               this.edgeIterator = new EdgesIterator<K, EV>();
+               this.outVertex = new Vertex<>();
+               this.outMsg = new Tuple2<>();
+               this.edgeIterator = new EdgesIterator<>();
        }
        
        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
index 70c8262..9398d8d 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.graph.pregel;
 
-import java.io.Serializable;
-
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.Either;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
+import java.io.Serializable;
+
 /**
- * The base class for combining messages sent during a {@link 
VertexCentricteration}.
+ * The base class for combining messages sent during a {@link 
VertexCentricIteration}.
  * 
  * @param <K> The type of the vertex id
  * @param <Message> The type of the message sent between vertices along the 
edges.
@@ -37,15 +37,12 @@ public abstract class MessageCombiner<K, Message> 
implements Serializable {
 
        private Collector<Tuple2<K, Either<NullValue, Message>>> out;
 
-       private K vertexId;
-
        private Tuple2<K, Either<NullValue, Message>> outValue;
 
        void set(K target, Collector<Tuple2<K, Either<NullValue, Message>>> 
collector) {
-               this.vertexId = target;
                this.out = collector;
-               this.outValue = new Tuple2<K, Either<NullValue, Message>>();
-               outValue.setField(vertexId, 0);
+               this.outValue = new Tuple2<>();
+               outValue.setField(target, 0);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
index 026e869..a0f793a 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
@@ -37,7 +37,7 @@ import java.util.List;
 public class VertexCentricConfiguration extends IterationConfiguration {
 
        /** the broadcast variables for the compute function **/
-       private List<Tuple2<String, DataSet<?>>> bcVars = new 
ArrayList<Tuple2<String,DataSet<?>>>();
+       private List<Tuple2<String, DataSet<?>>> bcVars = new ArrayList<>();
 
        public VertexCentricConfiguration() {}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
index 8d96776..ebf2e8d 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java
@@ -18,24 +18,21 @@
 
 package org.apache.flink.graph.pregel;
 
-import java.util.Iterator;
-import java.util.Map;
-
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupCombineFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
 import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
 import org.apache.flink.api.java.operators.CoGroupOperator;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
+import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -50,6 +47,9 @@ import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Iterator;
+import java.util.Map;
+
 /**
  * This class represents iterative graph computations, programmed in a 
vertex-centric perspective.
  * It is a special case of <i>Bulk Synchronous Parallel</i> computation. The 
paradigm has also been
@@ -158,14 +158,14 @@ public class VertexCentricIteration<K, VV, EV, Message>
                // prepare the type information
                TypeInformation<K> keyType = ((TupleTypeInfo<?>) 
initialVertices.getType()).getTypeAt(0);
                TypeInformation<Tuple2<K, Message>> messageTypeInfo =
-                       new TupleTypeInfo<Tuple2<K, Message>>(keyType, 
messageType);
+                       new TupleTypeInfo<>(keyType, messageType);
                TypeInformation<Vertex<K, VV>> vertexType = 
initialVertices.getType();
                TypeInformation<Either<Vertex<K, VV>, Tuple2<K, Message>>> 
intermediateTypeInfo =
-                       new EitherTypeInfo<Vertex<K, VV>, Tuple2<K, 
Message>>(vertexType, messageTypeInfo);
+                       new EitherTypeInfo<>(vertexType, messageTypeInfo);
                TypeInformation<Either<NullValue, Message>> nullableMsgTypeInfo 
=
-                               new EitherTypeInfo<NullValue, 
Message>(TypeExtractor.getForClass(NullValue.class), messageType);
+                       new 
EitherTypeInfo<>(TypeExtractor.getForClass(NullValue.class), messageType);
                TypeInformation<Tuple2<K, Either<NullValue, Message>>> 
workSetTypeInfo =
-                       new TupleTypeInfo<Tuple2<K, Either<NullValue, 
Message>>>(keyType, nullableMsgTypeInfo);
+                       new TupleTypeInfo<>(keyType, nullableMsgTypeInfo);
 
                DataSet<Tuple2<K, Either<NullValue, Message>>> initialWorkSet = 
initialVertices.map(
                                new InitializeWorkSet<K, VV, 
Message>()).returns(workSetTypeInfo);
@@ -183,7 +183,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
                                                vertexType, 
nullableMsgTypeInfo));
 
                VertexComputeUdf<K, VV, EV, Message> vertexUdf =
-                               new VertexComputeUdf<K, VV, EV, 
Message>(computeFunction, intermediateTypeInfo); 
+                       new VertexComputeUdf<>(computeFunction, 
intermediateTypeInfo);
 
                CoGroupOperator<?, ?, Either<Vertex<K, VV>, Tuple2<K, 
Message>>> superstepComputation =
                                verticesWithMsgs.coGroup(edgesWithValue)
@@ -204,7 +204,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
                if (combineFunction != null) {
 
                        MessageCombinerUdf<K, Message> combinerUdf =
-                                       new MessageCombinerUdf<K, 
Message>(combineFunction, workSetTypeInfo);
+                               new MessageCombinerUdf<>(combineFunction, 
workSetTypeInfo);
 
                        DataSet<Tuple2<K, Either<NullValue, Message>>> 
combinedMessages = allMessages
                                        .groupBy(0).reduceGroup(combinerUdf)
@@ -237,12 +237,12 @@ public class VertexCentricIteration<K, VV, EV, Message>
         * 
         * @return An instance of the vertex-centric graph computation operator.
         */
-       public static final <K, VV, EV, Message> VertexCentricIteration<K, VV, 
EV, Message> withEdges(
+       public static <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, 
Message> withEdges(
                DataSet<Edge<K, EV>> edgesWithValue, ComputeFunction<K, VV, EV, 
Message> cf,
                int maximumNumberOfIterations) {
 
-               return new VertexCentricIteration<K, VV, EV, Message>(cf, 
edgesWithValue, null,
-                               maximumNumberOfIterations);
+               return new VertexCentricIteration<>(cf, edgesWithValue, null,
+                       maximumNumberOfIterations);
        }
 
        /**
@@ -260,12 +260,12 @@ public class VertexCentricIteration<K, VV, EV, Message>
         * 
         * @return An instance of the vertex-centric graph computation operator.
         */
-       public static final <K, VV, EV, Message> VertexCentricIteration<K, VV, 
EV, Message> withEdges(
+       public static <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, 
Message> withEdges(
                DataSet<Edge<K, EV>> edgesWithValue, ComputeFunction<K, VV, EV, 
Message> cf,
                MessageCombiner<K, Message> mc, int maximumNumberOfIterations) {
 
-               return new VertexCentricIteration<K, VV, EV, Message>(cf, 
edgesWithValue, mc,
-                               maximumNumberOfIterations);
+               return new VertexCentricIteration<>(cf, edgesWithValue, mc,
+                       maximumNumberOfIterations);
        }
 
        /**
@@ -297,7 +297,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
 
                @Override
                public void open(Configuration parameters) {
-                       outTuple = new Tuple2<K, Either<NullValue, Message>>();
+                       outTuple = new Tuple2<>();
                        nullMessage = Either.Left(NullValue.getInstance());
                        outTuple.setField(nullMessage, 1);
                }
@@ -308,12 +308,12 @@ public class VertexCentricIteration<K, VV, EV, Message>
                }
 }
        
-       @SuppressWarnings("serial")
        /**
         * This coGroup class wraps the user-defined compute function.
         * The first input holds a Tuple2 containing the vertex state and its 
inbox.
         * The second input is an iterator of the out-going edges of this 
vertex.
         */
+       @SuppressWarnings("serial")
        private static class VertexComputeUdf<K, VV, EV, Message> extends 
RichCoGroupFunction<
                Tuple2<Vertex<K, VV>, Either<NullValue, Message>>, Edge<K, EV>,
                Either<Vertex<K, VV>, Tuple2<K, Message>>>
@@ -469,8 +469,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
                        JoinFunction<Vertex<K, VV>, Tuple2<K, Either<NullValue, 
Message>>,
                                        Tuple2<Vertex<K, VV>, Either<NullValue, 
Message>>> {
 
-               private Tuple2<Vertex<K, VV>, Either<NullValue, Message>> 
outTuple =
-                               new Tuple2<Vertex<K, VV>, Either<NullValue, 
Message>>();
+               private Tuple2<Vertex<K, VV>, Either<NullValue, Message>> 
outTuple = new Tuple2<>();
 
                public Tuple2<Vertex<K, VV>, Either<NullValue, Message>> join(
                                Vertex<K, VV> vertex, Tuple2<K, 
Either<NullValue, Message>> message) {
@@ -498,7 +497,7 @@ public class VertexCentricIteration<K, VV, EV, Message>
        private static final class ProjectMessages<K, VV, Message> implements
                        FlatMapFunction<Either<Vertex<K, VV>, Tuple2<K, 
Message>>, Tuple2<K, Either<NullValue, Message>>> {
 
-               private Tuple2<K, Either<NullValue, Message>> outTuple = new 
Tuple2<K, Either<NullValue, Message>>();
+               private Tuple2<K, Either<NullValue, Message>> outTuple = new 
Tuple2<>();
 
                public void flatMap(Either<Vertex<K, VV>, Tuple2<K, Message>> 
value,
                                Collector<Tuple2<K, Either<NullValue, 
Message>>> out) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
index d56c0da..93b3a8c 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java
@@ -141,7 +141,7 @@ public abstract class GatherFunction<K, VV, Message> 
implements Serializable {
         * @return The aggregator registered under this name, or null, if no 
aggregator was registered.
         */
        public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-               return this.runtimeContext.<T>getIterationAggregator(name);
+               return this.runtimeContext.getIterationAggregator(name);
        }
 
        /**
@@ -151,7 +151,7 @@ public abstract class GatherFunction<K, VV, Message> 
implements Serializable {
         * @return The aggregated value of the previous iteration.
         */
        public <T extends Value> T getPreviousIterationAggregate(String name) {
-               return 
this.runtimeContext.<T>getPreviousIterationAggregate(name);
+               return this.runtimeContext.getPreviousIterationAggregate(name);
        }
 
        /**
@@ -163,7 +163,7 @@ public abstract class GatherFunction<K, VV, Message> 
implements Serializable {
         * @return The broadcast data set.
         */
        public <T> Collection<T> getBroadcastSet(String name) {
-               return this.runtimeContext.<T>getBroadcastVariable(name);
+               return this.runtimeContext.getBroadcastVariable(name);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -243,8 +243,8 @@ public abstract class GatherFunction<K, VV, Message> 
implements Serializable {
        <VertexWithDegree> void 
updateVertexFromScatterGatherIteration(Vertex<K, VertexWithDegree> vertexState,
                                                                                
                MessageIterator<Message> inMessages) throws Exception {
 
-               Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0,
-                               ((Tuple3<VV, Long, 
Long>)vertexState.getValue()).f0);
+               Vertex<K, VV> vertex = new Vertex<>(vertexState.f0,
+                       ((Tuple3<VV, Long, Long>) vertexState.getValue()).f0);
 
                updateVertex(vertex, inMessages);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
index 336e73d..b99b5b7 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java
@@ -217,7 +217,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> 
implements Serializabl
         * @return The aggregator registered under this name, or null, if no 
aggregator was registered.
         */
        public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-               return this.runtimeContext.<T>getIterationAggregator(name);
+               return this.runtimeContext.getIterationAggregator(name);
        }
 
        /**
@@ -227,7 +227,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> 
implements Serializabl
         * @return The aggregated value of the previous iteration.
         */
        public <T extends Value> T getPreviousIterationAggregate(String name) {
-               return 
this.runtimeContext.<T>getPreviousIterationAggregate(name);
+               return this.runtimeContext.getPreviousIterationAggregate(name);
        }
 
        /**
@@ -239,7 +239,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> 
implements Serializabl
         * @return The broadcast data set.
         */
        public <T> Collection<T> getBroadcastSet(String name) {
-               return this.runtimeContext.<T>getBroadcastVariable(name);
+               return this.runtimeContext.getBroadcastVariable(name);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -266,8 +266,8 @@ public abstract class ScatterFunction<K, VV, Message, EV> 
implements Serializabl
 
        void init(IterationRuntimeContext context) {
                this.runtimeContext = context;
-               this.outValue = new Tuple2<K, Message>();
-               this.edgeIterator = new EdgesIterator<K, EV>();
+               this.outValue = new Tuple2<>();
+               this.edgeIterator = new EdgesIterator<>();
        }
 
        void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out, K id) {
@@ -282,7 +282,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> 
implements Serializabl
        {
                private Iterator<Edge<K, EV>> input;
 
-               private Edge<K, EV> edge = new Edge<K, EV>();
+               private Edge<K, EV> edge = new Edge<>();
 
                void set(Iterator<Edge<K, EV>> input) {
                        this.input = input;

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
index fde305f..8049932 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
@@ -196,7 +196,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
         * 
         * @return An in stance of the scatter-gather graph computation 
operator.
         */
-       public static final <K, VV, Message, EV> ScatterGatherIteration<K, VV, 
Message, EV> withEdges(
+       public static <K, VV, Message, EV> ScatterGatherIteration<K, VV, 
Message, EV> withEdges(
                DataSet<Edge<K, EV>> edgesWithValue, ScatterFunction<K, VV, 
Message, EV> sf,
                GatherFunction<K, VV, Message> gf, int 
maximumNumberOfIterations)
        {
@@ -588,8 +588,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
                                throw new IllegalArgumentException("Illegal 
edge direction");
                }
 
-               GatherUdf<K, VV, Message> updateUdf =
-                               new GatherUdfSimpleVV<K, VV, 
Message>(gatherFunction, vertexTypes);
+               GatherUdf<K, VV, Message> updateUdf = new 
GatherUdfSimpleVV<>(gatherFunction, vertexTypes);
 
                // build the update function (co group)
                CoGroupOperator<?, ?, Vertex<K, VV>> updates =

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
index 33d469b..b620dd8 100644
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -50,8 +50,8 @@ public class InvalidVertexIdsValidator<K, VV, EV> extends 
GraphValidator<K, VV,
 
        private static final class MapEdgeIds<K, EV> implements 
FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
                public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) 
{
-                       out.collect(new Tuple1<K>(edge.f0));
-                       out.collect(new Tuple1<K>(edge.f1));
+                       out.collect(new Tuple1<>(edge.f0));
+                       out.collect(new Tuple1<>(edge.f1));
                }
        }
 
@@ -67,7 +67,7 @@ public class InvalidVertexIdsValidator<K, VV, EV> extends 
GraphValidator<K, VV,
 
        private static final class KToTupleMap<K> implements MapFunction<K, 
Tuple1<K>> {
                public Tuple1<K> map(K key) throws Exception {
-                       return new Tuple1<K>(key);
+                       return new Tuple1<>(key);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
index 0cf026c..ba6bd05 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java
@@ -33,7 +33,6 @@ import org.junit.Test;
 import java.util.LinkedList;
 import java.util.List;
 
-import static 
org.apache.flink.graph.asm.translate.Translate.translateVertexValues;
 import static org.junit.Assert.assertEquals;
 
 public class TranslateTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
index 1e44d5b..98ecf16 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
@@ -22,7 +22,6 @@ import 
org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
@@ -240,10 +239,10 @@ public class GatherSumApplyConfigurationITCase extends 
MultipleProgramsTestBase
 
                        // test bcast variable
                        @SuppressWarnings("unchecked")
-                       List<Tuple1<Integer>> bcastSet = 
(List<Tuple1<Integer>>)(List<?>)getBroadcastSet("gatherBcastSet");
-                       Assert.assertEquals(1, bcastSet.get(0));
-                       Assert.assertEquals(2, bcastSet.get(1));
-                       Assert.assertEquals(3, bcastSet.get(2));
+                       List<Integer> bcastSet = 
(List<Integer>)(List<?>)getBroadcastSet("gatherBcastSet");
+                       Assert.assertEquals(1, bcastSet.get(0).intValue());
+                       Assert.assertEquals(2, bcastSet.get(1).intValue());
+                       Assert.assertEquals(3, bcastSet.get(2).intValue());
 
                        // test aggregator
                        if (getSuperstepNumber() == 2) {
@@ -271,10 +270,10 @@ public class GatherSumApplyConfigurationITCase extends 
MultipleProgramsTestBase
 
                        // test bcast variable
                        @SuppressWarnings("unchecked")
-                       List<Tuple1<Integer>> bcastSet = 
(List<Tuple1<Integer>>)(List<?>)getBroadcastSet("sumBcastSet");
-                       Assert.assertEquals(4, bcastSet.get(0));
-                       Assert.assertEquals(5, bcastSet.get(1));
-                       Assert.assertEquals(6, bcastSet.get(2));
+                       List<Integer> bcastSet = 
(List<Integer>)(List<?>)getBroadcastSet("sumBcastSet");
+                       Assert.assertEquals(4, bcastSet.get(0).intValue());
+                       Assert.assertEquals(5, bcastSet.get(1).intValue());
+                       Assert.assertEquals(6, bcastSet.get(2).intValue());
 
                        // test aggregator
                        aggregator = 
getIterationAggregator("superstepAggregator");
@@ -286,7 +285,7 @@ public class GatherSumApplyConfigurationITCase extends 
MultipleProgramsTestBase
                public Long sum(Long newValue, Long currentValue) {
                        long superstep = getSuperstepNumber();
                        aggregator.aggregate(superstep);
-                       return 0l;
+                       return 0L;
                }
        }
 
@@ -300,10 +299,10 @@ public class GatherSumApplyConfigurationITCase extends 
MultipleProgramsTestBase
 
                        // test bcast variable
                        @SuppressWarnings("unchecked")
-                       List<Tuple1<Integer>> bcastSet = 
(List<Tuple1<Integer>>)(List<?>)getBroadcastSet("applyBcastSet");
-                       Assert.assertEquals(7, bcastSet.get(0));
-                       Assert.assertEquals(8, bcastSet.get(1));
-                       Assert.assertEquals(9, bcastSet.get(2));
+                       List<Integer> bcastSet = 
(List<Integer>)(List<?>)getBroadcastSet("applyBcastSet");
+                       Assert.assertEquals(7, bcastSet.get(0).intValue());
+                       Assert.assertEquals(8, bcastSet.get(1).intValue());
+                       Assert.assertEquals(9, bcastSet.get(2).intValue());
 
                        // test aggregator
                        aggregator = 
getIterationAggregator("superstepAggregator");
@@ -338,7 +337,7 @@ public class GatherSumApplyConfigurationITCase extends 
MultipleProgramsTestBase
        private static final class DummySum extends SumFunction<Long, Long, 
Long> {
 
                public Long sum(Long newValue, Long currentValue) {
-                       return 0l;
+                       return 0L;
                }
        }
 
@@ -354,7 +353,7 @@ public class GatherSumApplyConfigurationITCase extends 
MultipleProgramsTestBase
        public static final class AssignOneMapper implements 
MapFunction<Vertex<Long, Long>, Long> {
 
                public Long map(Vertex<Long, Long> value) {
-                       return 1l;
+                       return 1L;
                }
        }
 
@@ -363,7 +362,7 @@ public class GatherSumApplyConfigurationITCase extends 
MultipleProgramsTestBase
 
                @Override
                public HashSet<Long> map(Vertex<Long, Long> value) throws 
Exception {
-                       HashSet<Long> h = new HashSet<Long>();
+                       HashSet<Long> h = new HashSet<>();
                        h.add(value.getId());
                        return h;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
index fcd0d82..2213700 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java
@@ -22,7 +22,6 @@ import 
org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
@@ -521,10 +520,10 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
 
                        // test bcast variable
                        @SuppressWarnings("unchecked")
-                       List<Tuple1<Integer>> bcastSet = 
(List<Tuple1<Integer>>)(List<?>)getBroadcastSet("messagingBcastSet");
-                       Assert.assertEquals(4, bcastSet.get(0));
-                       Assert.assertEquals(5, bcastSet.get(1));
-                       Assert.assertEquals(6, bcastSet.get(2));
+                       List<Integer> bcastSet = 
(List<Integer>)(List<?>)getBroadcastSet("messagingBcastSet");
+                       Assert.assertEquals(4, bcastSet.get(0).intValue());
+                       Assert.assertEquals(5, bcastSet.get(1).intValue());
+                       Assert.assertEquals(6, bcastSet.get(2).intValue());
 
                        // test number of vertices
                        Assert.assertEquals(5, getNumberOfVertices());
@@ -569,10 +568,10 @@ public class ScatterGatherConfigurationITCase extends 
MultipleProgramsTestBase {
 
                        // test bcast variable
                        @SuppressWarnings("unchecked")
-                       List<Tuple1<Integer>> bcastSet = 
(List<Tuple1<Integer>>)(List<?>)getBroadcastSet("updateBcastSet");
-                       Assert.assertEquals(1, bcastSet.get(0));
-                       Assert.assertEquals(2, bcastSet.get(1));
-                       Assert.assertEquals(3, bcastSet.get(2));
+                       List<Integer> bcastSet = 
(List<Integer>)(List<?>)getBroadcastSet("updateBcastSet");
+                       Assert.assertEquals(1, bcastSet.get(0).intValue());
+                       Assert.assertEquals(2, bcastSet.get(1).intValue());
+                       Assert.assertEquals(3, bcastSet.get(2).intValue());
 
                        // test aggregator
                        aggregator = 
getIterationAggregator("superstepAggregator");

http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
index 99c66ec..3ccdef0 100644
--- 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.test.operations;
 
-import com.google.common.base.Charsets;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.core.fs.FileInputSplit;
@@ -30,10 +29,12 @@ import org.apache.flink.types.NullValue;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
 import java.util.List;
 
 @RunWith(Parameterized.class)
@@ -193,7 +194,7 @@ public class GraphCreationWithCsvITCase extends 
MultipleProgramsTestBase {
                tempFile.deleteOnExit();
 
                OutputStreamWriter wrt = new OutputStreamWriter(
-                               new FileOutputStream(tempFile), Charsets.UTF_8
+                               new FileOutputStream(tempFile), 
Charset.forName("UTF-8")
                );
                wrt.write(content);
                wrt.close();

Reply via email to