[FLINK-4729] [gelly] Use optional VertexCentric CombineFunction Passes through the CombineFunction to VertexCentricIteration, and other code cleanup discovered via IntelliJ's code analyzer.
This closes #2587 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb34133e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb34133e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb34133e Branch: refs/heads/master Commit: bb34133ed30b2a6c74baa0e5342e278e039cbe2a Parents: 8c7c42f Author: Greg Hogan <[email protected]> Authored: Mon Oct 3 13:59:38 2016 -0400 Committer: Greg Hogan <[email protected]> Committed: Wed Oct 5 12:26:16 2016 -0400 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Edge.java | 2 +- .../main/java/org/apache/flink/graph/Graph.java | 165 +++++++++---------- .../org/apache/flink/graph/GraphAlgorithm.java | 2 +- .../org/apache/flink/graph/GraphCsvReader.java | 10 +- .../flink/graph/IterationConfiguration.java | 2 +- .../java/org/apache/flink/graph/Triplet.java | 6 +- .../apache/flink/graph/VertexJoinFunction.java | 5 +- .../apache/flink/graph/gsa/ApplyFunction.java | 6 +- .../flink/graph/gsa/GSAConfiguration.java | 6 +- .../apache/flink/graph/gsa/GatherFunction.java | 6 +- .../graph/gsa/GatherSumApplyIteration.java | 29 ++-- .../org/apache/flink/graph/gsa/SumFunction.java | 6 +- .../library/GSASingleSourceShortestPaths.java | 6 +- .../flink/graph/library/LabelPropagation.java | 2 +- .../library/SingleSourceShortestPaths.java | 2 +- .../flink/graph/library/TriangleEnumerator.java | 2 +- .../flink/graph/pregel/ComputeFunction.java | 24 +-- .../flink/graph/pregel/MessageCombiner.java | 13 +- .../pregel/VertexCentricConfiguration.java | 2 +- .../graph/pregel/VertexCentricIteration.java | 45 +++-- .../flink/graph/spargel/GatherFunction.java | 10 +- .../flink/graph/spargel/ScatterFunction.java | 12 +- .../graph/spargel/ScatterGatherIteration.java | 5 +- .../validation/InvalidVertexIdsValidator.java | 6 +- .../graph/asm/translate/TranslateTest.java | 1 - .../test/GatherSumApplyConfigurationITCase.java | 33 ++-- .../test/ScatterGatherConfigurationITCase.java | 17 +- .../operations/GraphCreationWithCsvITCase.java | 5 +- .../test/operations/GraphOperationsITCase.java | 55 +++---- .../ReduceOnNeighborMethodsITCase.java | 5 +- 30 files changed, 235 insertions(+), 255 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java index d84badb..2bcce29 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java @@ -46,7 +46,7 @@ public class Edge<K, V> extends Tuple3<K, K, V>{ * and the target is the original Edge's source. */ public Edge<K, V> reverse() { - return new Edge<K, V>(this.f1, this.f0, this.f2); + return new Edge<>(this.f1, this.f0, this.f2); } public void setSource(K src) { http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java index 821b0a7..02d1eeb 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java @@ -66,6 +66,7 @@ import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -160,7 +161,7 @@ public class Graph<K, VV, EV> { public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) { - return new Graph<K, VV, EV>(vertices, edges, context); + return new Graph<>(vertices, edges, context); } /** @@ -177,15 +178,15 @@ public class Graph<K, VV, EV> { DataSet<Vertex<K, NullValue>> vertices = edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct(); - return new Graph<K, NullValue, EV>(vertices, edges, context); + return new Graph<>(vertices, edges, context); } private static final class EmitSrcAndTarget<K, EV> implements FlatMapFunction< Edge<K, EV>, Vertex<K, NullValue>> { public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, NullValue>> out) { - out.collect(new Vertex<K, NullValue>(edge.f0, NullValue.getInstance())); - out.collect(new Vertex<K, NullValue>(edge.f1, NullValue.getInstance())); + out.collect(new Vertex<>(edge.f0, NullValue.getInstance())); + out.collect(new Vertex<>(edge.f1, NullValue.getInstance())); } } @@ -216,19 +217,19 @@ public class Graph<K, VV, EV> { .flatMap(new EmitSrcAndTargetAsTuple1<K, EV>()).distinct() .map(new MapFunction<Tuple1<K>, Vertex<K, VV>>() { public Vertex<K, VV> map(Tuple1<K> value) throws Exception { - return new Vertex<K, VV>(value.f0, vertexValueInitializer.map(value.f0)); + return new Vertex<>(value.f0, vertexValueInitializer.map(value.f0)); } }).returns(returnType).withForwardedFields("f0"); - return new Graph<K, VV, EV>(vertices, edges, context); + return new Graph<>(vertices, edges, context); } private static final class EmitSrcAndTargetAsTuple1<K, EV> implements FlatMapFunction< Edge<K, EV>, Tuple1<K>> { public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) { - out.collect(new Tuple1<K>(edge.f0)); - out.collect(new Tuple1<K>(edge.f1)); + out.collect(new Tuple1<>(edge.f0)); + out.collect(new Tuple1<>(edge.f1)); } } @@ -316,7 +317,7 @@ public class Graph<K, VV, EV> { new MapFunction<Tuple2<K, K>, Edge<K, NullValue>>() { public Edge<K, NullValue> map(Tuple2<K, K> input) { - return new Edge<K, NullValue>(input.f0, input.f1, NullValue.getInstance()); + return new Edge<>(input.f0, input.f1, NullValue.getInstance()); } }).withForwardedFields("f0; f1"); return fromDataSet(edgeDataSet, context); @@ -344,7 +345,7 @@ public class Graph<K, VV, EV> { new MapFunction<Tuple2<K, K>, Edge<K, NullValue>>() { public Edge<K, NullValue> map(Tuple2<K, K> input) { - return new Edge<K, NullValue>(input.f0, input.f1, NullValue.getInstance()); + return new Edge<>(input.f0, input.f1, NullValue.getInstance()); } }).withForwardedFields("f0; f1"); return fromDataSet(edgeDataSet, vertexValueInitializer, context); @@ -472,7 +473,7 @@ public class Graph<K, VV, EV> { public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple4<K, K, VV, EV>> collector) throws Exception { - collector.collect(new Tuple4<K, K, VV, EV>(edge.getSource(), edge.getTarget(), vertex.getValue(), + collector.collect(new Tuple4<>(edge.getSource(), edge.getTarget(), vertex.getValue(), edge.getValue())); } } @@ -486,7 +487,7 @@ public class Graph<K, VV, EV> { public void join(Tuple4<K, K, VV, EV> tripletWithSrcValSet, Vertex<K, VV> vertex, Collector<Triplet<K, VV, EV>> collector) throws Exception { - collector.collect(new Triplet<K, VV, EV>(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1, + collector.collect(new Triplet<>(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1, tripletWithSrcValSet.f2, vertex.getValue(), tripletWithSrcValSet.f3)); } } @@ -521,13 +522,13 @@ public class Graph<K, VV, EV> { DataSet<Vertex<K, NV>> mappedVertices = vertices.map( new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() { public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception { - return new Vertex<K, NV>(value.f0, mapper.map(value)); + return new Vertex<>(value.f0, mapper.map(value)); } }) .returns(returnType) .withForwardedFields("f0"); - return new Graph<K, NV, EV>(mappedVertices, this.edges, this.context); + return new Graph<>(mappedVertices, this.edges, this.context); } /** @@ -596,14 +597,13 @@ public class Graph<K, VV, EV> { DataSet<Edge<K, NV>> mappedEdges = edges.map( new MapFunction<Edge<K, EV>, Edge<K, NV>>() { public Edge<K, NV> map(Edge<K, EV> value) throws Exception { - return new Edge<K, NV>(value.f0, value.f1, mapper - .map(value)); + return new Edge<>(value.f0, value.f1, mapper.map(value)); } }) .returns(returnType) .withForwardedFields("f0; f1"); - return new Graph<K, VV, NV>(this.vertices, mappedEdges, this.context); + return new Graph<>(this.vertices, mappedEdges, this.context); } /** @@ -628,7 +628,7 @@ public class Graph<K, VV, EV> { DataSet<Vertex<K, VV>> resultedVertices = this.getVertices() .coGroup(inputDataSet).where(0).equalTo(0) .with(new ApplyCoGroupToVertexValues<K, VV, T>(vertexJoinFunction)); - return new Graph<K, VV, EV>(resultedVertices, this.edges, this.context); + return new Graph<>(resultedVertices, this.edges, this.context); } private static final class ApplyCoGroupToVertexValues<K, VV, T> @@ -651,7 +651,7 @@ public class Graph<K, VV, EV> { if (inputIterator.hasNext()) { final Tuple2<K, T> inputNext = inputIterator.next(); - collector.collect(new Vertex<K, VV>(inputNext.f0, vertexJoinFunction + collector.collect(new Vertex<>(inputNext.f0, vertexJoinFunction .vertexJoin(vertexIterator.next().f1, inputNext.f1))); } else { collector.collect(vertexIterator.next()); @@ -681,7 +681,7 @@ public class Graph<K, VV, EV> { DataSet<Edge<K, EV>> resultedEdges = this.getEdges() .coGroup(inputDataSet).where(0, 1).equalTo(0, 1) .with(new ApplyCoGroupToEdgeValues<K, EV, T>(edgeJoinFunction)); - return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context); + return new Graph<>(this.vertices, resultedEdges, this.context); } private static final class ApplyCoGroupToEdgeValues<K, EV, T> @@ -704,7 +704,7 @@ public class Graph<K, VV, EV> { if (inputIterator.hasNext()) { final Tuple3<K, K, T> inputNext = inputIterator.next(); - collector.collect(new Edge<K, EV>(inputNext.f0, + collector.collect(new Edge<>(inputNext.f0, inputNext.f1, edgeJoinFunction.edgeJoin( edgesIterator.next().f2, inputNext.f2))); } else { @@ -736,7 +736,7 @@ public class Graph<K, VV, EV> { .coGroup(inputDataSet).where(0).equalTo(0) .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction)); - return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context); + return new Graph<>(this.vertices, resultedEdges, this.context); } private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T> @@ -761,7 +761,7 @@ public class Graph<K, VV, EV> { while (edgesIterator.hasNext()) { Edge<K, EV> edgesNext = edgesIterator.next(); - collector.collect(new Edge<K, EV>(edgesNext.f0, + collector.collect(new Edge<>(edgesNext.f0, edgesNext.f1, edgeJoinFunction.edgeJoin(edgesNext.f2, inputNext.f1))); } @@ -795,7 +795,7 @@ public class Graph<K, VV, EV> { .coGroup(inputDataSet).where(1).equalTo(0) .with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction)); - return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context); + return new Graph<>(this.vertices, resultedEdges, this.context); } /** @@ -817,8 +817,7 @@ public class Graph<K, VV, EV> { DataSet<Edge<K, EV>> filteredEdges = remainingEdges.filter(edgeFilter); - return new Graph<K, VV, EV>(filteredVertices, filteredEdges, - this.context); + return new Graph<>(filteredVertices, filteredEdges, this.context); } /** @@ -837,7 +836,7 @@ public class Graph<K, VV, EV> { .join(filteredVertices).where(1).equalTo(0) .with(new ProjectEdge<K, VV, EV>()); - return new Graph<K, VV, EV>(filteredVertices, remainingEdges, this.context); + return new Graph<>(filteredVertices, remainingEdges, this.context); } /** @@ -850,7 +849,7 @@ public class Graph<K, VV, EV> { public Graph<K, VV, EV> filterOnEdges(FilterFunction<Edge<K, EV>> edgeFilter) { DataSet<Edge<K, EV>> filteredEdges = this.edges.filter(edgeFilter); - return new Graph<K, VV, EV>(this.vertices, filteredEdges, this.context); + return new Graph<>(this.vertices, filteredEdges, this.context); } @ForwardedFieldsFirst("f0; f1; f2") @@ -924,7 +923,7 @@ public class Graph<K, VV, EV> { public Graph<K, VV, EV> getUndirected() { DataSet<Edge<K, EV>> undirectedEdges = edges.flatMap(new RegularAndReversedEdgesMap<K, EV>()); - return new Graph<K, VV, EV>(vertices, undirectedEdges, this.context); + return new Graph<>(vertices, undirectedEdges, this.context); } /** @@ -947,13 +946,13 @@ public class Graph<K, VV, EV> { switch (direction) { case IN: return vertices.coGroup(edges).where(0).equalTo(1) - .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)); + .with(new ApplyCoGroupFunction<>(edgesFunction)); case OUT: return vertices.coGroup(edges).where(0).equalTo(0) - .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)); + .with(new ApplyCoGroupFunction<>(edgesFunction)); case ALL: return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())) - .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction)); + .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction)); default: throw new IllegalArgumentException("Illegal edge direction"); } @@ -980,13 +979,13 @@ public class Graph<K, VV, EV> { switch (direction) { case IN: return vertices.coGroup(edges).where(0).equalTo(1) - .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo); + .with(new ApplyCoGroupFunction<>(edgesFunction)).returns(typeInfo); case OUT: return vertices.coGroup(edges).where(0).equalTo(0) - .with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo); + .with(new ApplyCoGroupFunction<>(edgesFunction)).returns(typeInfo); case ALL: return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())) - .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction)).returns(typeInfo); + .where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<>(edgesFunction)).returns(typeInfo); default: throw new IllegalArgumentException("Illegal edge direction"); } @@ -1013,14 +1012,14 @@ public class Graph<K, VV, EV> { case IN: return edges.map(new ProjectVertexIdMap<K, EV>(1)) .withForwardedFields("f1->f0") - .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)); + .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)); case OUT: return edges.map(new ProjectVertexIdMap<K, EV>(0)) .withForwardedFields("f0") - .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)); + .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)); case ALL: return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()) - .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)); + .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)); default: throw new IllegalArgumentException("Illegal edge direction"); } @@ -1048,14 +1047,14 @@ public class Graph<K, VV, EV> { case IN: return edges.map(new ProjectVertexIdMap<K, EV>(1)) .withForwardedFields("f1->f0") - .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo); + .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo); case OUT: return edges.map(new ProjectVertexIdMap<K, EV>(0)) .withForwardedFields("f0") - .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo); + .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo); case ALL: return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()) - .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo); + .groupBy(0).reduceGroup(new ApplyGroupReduceFunction<>(edgesFunction)).returns(typeInfo); default: throw new IllegalArgumentException("Illegal edge direction"); } @@ -1072,7 +1071,7 @@ public class Graph<K, VV, EV> { @SuppressWarnings("unchecked") public Tuple2<K, Edge<K, EV>> map(Edge<K, EV> edge) { - return new Tuple2<K, Edge<K, EV>>((K) edge.getField(fieldPosition), edge); + return new Tuple2<>((K) edge.getField(fieldPosition), edge); } } @@ -1087,7 +1086,7 @@ public class Graph<K, VV, EV> { @SuppressWarnings("unchecked") public Tuple2<K, EV> map(Edge<K, EV> edge) { - return new Tuple2<K, EV>((K) edge.getField(fieldPosition), edge.getValue()); + return new Tuple2<>((K) edge.getField(fieldPosition), edge.getValue()); } } @@ -1114,8 +1113,8 @@ public class Graph<K, VV, EV> { Edge<K, EV>, Tuple2<K, Edge<K, EV>>> { public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, Edge<K, EV>>> out) { - out.collect(new Tuple2<K, Edge<K, EV>>(edge.getSource(), edge)); - out.collect(new Tuple2<K, Edge<K, EV>>(edge.getTarget(), edge)); + out.collect(new Tuple2<>(edge.getSource(), edge)); + out.collect(new Tuple2<>(edge.getTarget(), edge)); } } @@ -1123,8 +1122,8 @@ public class Graph<K, VV, EV> { Edge<K, EV>, Tuple2<K, EV>> { public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, EV>> out) { - out.collect(new Tuple2<K, EV>(edge.getSource(), edge.getValue())); - out.collect(new Tuple2<K, EV>(edge.getTarget(), edge.getValue())); + out.collect(new Tuple2<>(edge.getSource(), edge.getValue())); + out.collect(new Tuple2<>(edge.getTarget(), edge.getValue())); } } @@ -1132,8 +1131,8 @@ public class Graph<K, VV, EV> { Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> { public void flatMap(Edge<K, EV> edge, Collector<Tuple3<K, K, Edge<K, EV>>> out) { - out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getSource(), edge.getTarget(), edge)); - out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getTarget(), edge.getSource(), edge)); + out.collect(new Tuple3<>(edge.getSource(), edge.getTarget(), edge)); + out.collect(new Tuple3<>(edge.getTarget(), edge.getSource(), edge)); } } @@ -1224,7 +1223,7 @@ public class Graph<K, VV, EV> { implements MapFunction<Edge<K, EV>, Edge<K, EV>> { public Edge<K, EV> map(Edge<K, EV> value) { - return new Edge<K, EV>(value.f1, value.f0, value.f2); + return new Edge<>(value.f1, value.f0, value.f2); } } @@ -1233,8 +1232,8 @@ public class Graph<K, VV, EV> { @Override public void flatMap(Edge<K, EV> edge, Collector<Edge<K, EV>> out) throws Exception { - out.collect(new Edge<K, EV>(edge.f0, edge.f1, edge.f2)); - out.collect(new Edge<K, EV>(edge.f1, edge.f0, edge.f2)); + out.collect(new Edge<>(edge.f0, edge.f1, edge.f2)); + out.collect(new Edge<>(edge.f1, edge.f0, edge.f2)); } } @@ -1246,7 +1245,7 @@ public class Graph<K, VV, EV> { */ public Graph<K, VV, EV> reverse() throws UnsupportedOperationException { DataSet<Edge<K, EV>> reversedEdges = edges.map(new ReverseEdgesMap<K, EV>()); - return new Graph<K, VV, EV>(vertices, reversedEdges, this.context); + return new Graph<>(vertices, reversedEdges, this.context); } /** @@ -1290,7 +1289,7 @@ public class Graph<K, VV, EV> { implements MapFunction<Edge<K, EV>, Tuple2<K, K>> { @Override public Tuple2<K, K> map(Edge<K, EV> edge) throws Exception { - return new Tuple2<K, K>(edge.f0, edge.f1); + return new Tuple2<>(edge.f0, edge.f1); } } @@ -1302,7 +1301,7 @@ public class Graph<K, VV, EV> { * @return the new graph containing the existing vertices as well as the one just added */ public Graph<K, VV, EV> addVertex(final Vertex<K, VV> vertex) { - List<Vertex<K, VV>> newVertex = new ArrayList<Vertex<K, VV>>(); + List<Vertex<K, VV>> newVertex = new ArrayList<>(); newVertex.add(vertex); return addVertices(newVertex); @@ -1353,7 +1352,7 @@ public class Graph<K, VV, EV> { */ public Graph<K, VV, EV> addEdge(Vertex<K, VV> source, Vertex<K, VV> target, EV edgeValue) { Graph<K, VV, EV> partialGraph = fromCollection(Arrays.asList(source, target), - Arrays.asList(new Edge<K, EV>(source.f0, target.f0, edgeValue)), + Collections.singletonList(new Edge<>(source.f0, target.f0, edgeValue)), this.context); return this.union(partialGraph); } @@ -1408,7 +1407,7 @@ public class Graph<K, VV, EV> { */ public Graph<K, VV, EV> removeVertex(Vertex<K, VV> vertex) { - List<Vertex<K, VV>> vertexToBeRemoved = new ArrayList<Vertex<K, VV>>(); + List<Vertex<K, VV>> vertexToBeRemoved = new ArrayList<>(); vertexToBeRemoved.add(vertex); return removeVertices(vertexToBeRemoved); @@ -1445,7 +1444,7 @@ public class Graph<K, VV, EV> { .join(newVertices).where(1).equalTo(0) .with(new ProjectEdge<K, VV, EV>()); - return new Graph<K, VV, EV>(newVertices, newEdges, context); + return new Graph<>(newVertices, newEdges, context); } private static final class VerticesRemovalCoGroup<K, VV> implements CoGroupFunction<Vertex<K, VV>, Vertex<K, VV>, Vertex<K, VV>> { @@ -1515,7 +1514,7 @@ public class Graph<K, VV, EV> { DataSet<Edge<K, EV>> newEdges = getEdges().coGroup(this.context.fromCollection(edgesToBeRemoved)) .where(0,1).equalTo(0,1).with(new EdgeRemovalCoGroup<K, EV>()); - return new Graph<K, VV, EV>(this.vertices, newEdges, context); + return new Graph<>(this.vertices, newEdges, context); } private static final class EdgeRemovalCoGroup<K,EV> implements CoGroupFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>> { @@ -1541,7 +1540,7 @@ public class Graph<K, VV, EV> { public Graph<K, VV, EV> union(Graph<K, VV, EV> graph) { DataSet<Vertex<K, VV>> unionedVertices = graph.getVertices().union(this.getVertices()).distinct(); DataSet<Edge<K, EV>> unionedEdges = graph.getEdges().union(this.getEdges()); - return new Graph<K, VV, EV>(unionedVertices, unionedEdges, this.context); + return new Graph<>(unionedVertices, unionedEdges, this.context); } /** @@ -1689,7 +1688,7 @@ public class Graph<K, VV, EV> { DataSet<Vertex<K, VV>> newVertices = this.getVertices().runOperation(iteration); - return new Graph<K, VV, EV>(newVertices, this.edges, this.context); + return new Graph<>(newVertices, this.edges, this.context); } /** @@ -1706,7 +1705,7 @@ public class Graph<K, VV, EV> { * after maximumNumberOfIterations. */ public <M> Graph<K, VV, EV> runGatherSumApplyIteration( - org.apache.flink.graph.gsa.GatherFunction gatherFunction, SumFunction<VV, EV, M> sumFunction, + org.apache.flink.graph.gsa.GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction, ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations) { return this.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction, @@ -1727,7 +1726,7 @@ public class Graph<K, VV, EV> { * after maximumNumberOfIterations. */ public <M> Graph<K, VV, EV> runGatherSumApplyIteration( - org.apache.flink.graph.gsa.GatherFunction gatherFunction, SumFunction<VV, EV, M> sumFunction, + org.apache.flink.graph.gsa.GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction, ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations, GSAConfiguration parameters) { @@ -1738,7 +1737,7 @@ public class Graph<K, VV, EV> { DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration); - return new Graph<K, VV, EV>(newVertices, this.edges, this.context); + return new Graph<>(newVertices, this.edges, this.context); } /** @@ -1776,10 +1775,10 @@ public class Graph<K, VV, EV> { VertexCentricConfiguration parameters) { VertexCentricIteration<K, VV, EV, M> iteration = VertexCentricIteration.withEdges( - edges, computeFunction, maximumNumberOfIterations); + edges, computeFunction, combiner, maximumNumberOfIterations); iteration.configure(parameters); DataSet<Vertex<K, VV>> newVertices = this.getVertices().runOperation(iteration); - return new Graph<K, VV, EV>(newVertices, this.edges, this.context); + return new Graph<>(newVertices, this.edges, this.context); } /** @@ -1831,14 +1830,14 @@ public class Graph<K, VV, EV> { .join(this.vertices).where(0).equalTo(0); return vertices.coGroup(edgesWithSources) .where(0).equalTo("f0.f1") - .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)); + .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)); case OUT: // create <edge-targetVertex> pairs DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges .join(this.vertices).where(1).equalTo(0); return vertices.coGroup(edgesWithTargets) .where(0).equalTo("f0.f0") - .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)); + .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)); case ALL: // create <edge-sourceOrTargetVertex> pairs DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges @@ -1848,7 +1847,7 @@ public class Graph<K, VV, EV> { return vertices.coGroup(edgesWithNeighbors) .where(0).equalTo(0) - .with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction)); + .with(new ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction)); default: throw new IllegalArgumentException("Illegal edge direction"); } @@ -1879,14 +1878,14 @@ public class Graph<K, VV, EV> { .join(this.vertices).where(0).equalTo(0); return vertices.coGroup(edgesWithSources) .where(0).equalTo("f0.f1") - .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo); + .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)).returns(typeInfo); case OUT: // create <edge-targetVertex> pairs DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges .join(this.vertices).where(1).equalTo(0); return vertices.coGroup(edgesWithTargets) .where(0).equalTo("f0.f0") - .with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo); + .with(new ApplyNeighborCoGroupFunction<>(neighborsFunction)).returns(typeInfo); case ALL: // create <edge-sourceOrTargetVertex> pairs DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges @@ -1896,7 +1895,7 @@ public class Graph<K, VV, EV> { return vertices.coGroup(edgesWithNeighbors) .where(0).equalTo(0) - .with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction)).returns(typeInfo); + .with(new ApplyCoGroupFunctionOnAllNeighbors<>(neighborsFunction)).returns(typeInfo); default: throw new IllegalArgumentException("Illegal edge direction"); } @@ -1927,7 +1926,7 @@ public class Graph<K, VV, EV> { .with(new ProjectVertexIdJoin<K, VV, EV>(1)) .withForwardedFieldsFirst("f1->f0"); return edgesWithSources.groupBy(0).reduceGroup( - new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)); + new ApplyNeighborGroupReduceFunction<>(neighborsFunction)); case OUT: // create <edge-targetVertex> pairs DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges @@ -1935,7 +1934,7 @@ public class Graph<K, VV, EV> { .with(new ProjectVertexIdJoin<K, VV, EV>(0)) .withForwardedFieldsFirst("f0"); return edgesWithTargets.groupBy(0).reduceGroup( - new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)); + new ApplyNeighborGroupReduceFunction<>(neighborsFunction)); case ALL: // create <edge-sourceOrTargetVertex> pairs DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges @@ -1944,7 +1943,7 @@ public class Graph<K, VV, EV> { .with(new ProjectEdgeWithNeighbor<K, VV, EV>()); return edgesWithNeighbors.groupBy(0).reduceGroup( - new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)); + new ApplyNeighborGroupReduceFunction<>(neighborsFunction)); default: throw new IllegalArgumentException("Illegal edge direction"); } @@ -1976,7 +1975,7 @@ public class Graph<K, VV, EV> { .with(new ProjectVertexIdJoin<K, VV, EV>(1)) .withForwardedFieldsFirst("f1->f0"); return edgesWithSources.groupBy(0).reduceGroup( - new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo); + new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo); case OUT: // create <edge-targetVertex> pairs DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges @@ -1984,7 +1983,7 @@ public class Graph<K, VV, EV> { .with(new ProjectVertexIdJoin<K, VV, EV>(0)) .withForwardedFieldsFirst("f0"); return edgesWithTargets.groupBy(0).reduceGroup( - new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo); + new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo); case ALL: // create <edge-sourceOrTargetVertex> pairs DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges @@ -1993,7 +1992,7 @@ public class Graph<K, VV, EV> { .with(new ProjectEdgeWithNeighbor<K, VV, EV>()); return edgesWithNeighbors.groupBy(0).reduceGroup( - new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo); + new ApplyNeighborGroupReduceFunction<>(neighborsFunction)).returns(typeInfo); default: throw new IllegalArgumentException("Illegal edge direction"); } @@ -2031,7 +2030,7 @@ public class Graph<K, VV, EV> { @SuppressWarnings("unchecked") public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex, Collector<Tuple2<K, VV>> out) { - out.collect(new Tuple2<K, VV>((K) edge.getField(fieldPosition), otherVertex.getValue())); + out.collect(new Tuple2<>((K) edge.getField(fieldPosition), otherVertex.getValue())); } } @@ -2047,7 +2046,7 @@ public class Graph<K, VV, EV> { @SuppressWarnings("unchecked") public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex, Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) { - out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>((K) edge.getField(fieldPosition), edge, otherVertex)); + out.collect(new Tuple3<>((K) edge.getField(fieldPosition), edge, otherVertex)); } } @@ -2059,7 +2058,7 @@ public class Graph<K, VV, EV> { public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor, Collector<Tuple2<K, VV>> out) { - out.collect(new Tuple2<K, VV>(keysWithEdge.f0, neighbor.getValue())); + out.collect(new Tuple2<>(keysWithEdge.f0, neighbor.getValue())); } } @@ -2070,7 +2069,7 @@ public class Graph<K, VV, EV> { public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor, Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) { - out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>(keysWithEdge.f0, keysWithEdge.f2, neighbor)); + out.collect(new Tuple3<>(keysWithEdge.f0, keysWithEdge.f2, neighbor)); } } @@ -2119,7 +2118,7 @@ public class Graph<K, VV, EV> { @Override public Tuple2<Edge<K, EV>, Vertex<K, VV>> next() { Tuple3<K, Edge<K, EV>, Vertex<K, VV>> next = keysWithEdgesIterator.next(); - return new Tuple2<Edge<K, EV>, Vertex<K, VV>>(next.f1, next.f2); + return new Tuple2<>(next.f1, next.f2); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java index 08cf011..8ec9650 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java @@ -26,5 +26,5 @@ package org.apache.flink.graph; */ public interface GraphAlgorithm<K, VV, EV, T> { - public T run(Graph<K, VV, EV> input) throws Exception; + T run(Graph<K, VV, EV> input) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java index f93abc4..4859e12 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java @@ -104,7 +104,7 @@ public class GraphCsvReader { public <K, VV, EV> Graph<K, VV, EV> types(Class<K> vertexKey, Class<VV> vertexValue, Class<EV> edgeValue) { - DataSet<Tuple2<K, VV>> vertices = null; + DataSet<Tuple2<K, VV>> vertices; if (edgeReader == null) { throw new RuntimeException("The edges input file cannot be null!"); @@ -160,9 +160,9 @@ public class GraphCsvReader { private static final long serialVersionUID = -2981792951286476970L; public Tuple3<K, K, NullValue> map(Tuple2<K, K> edge) { - return new Tuple3<K, K, NullValue>(edge.f0, edge.f1, NullValue.getInstance()); + return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance()); } - }).withForwardedFields("f0;f1");; + }).withForwardedFields("f0;f1"); return Graph.fromTupleDataSet(edges, executionContext); } @@ -179,7 +179,7 @@ public class GraphCsvReader { @SuppressWarnings({ "serial", "unchecked" }) public <K, VV> Graph<K, VV, NullValue> vertexTypes(Class<K> vertexKey, Class<VV> vertexValue) { - DataSet<Tuple2<K, VV>> vertices = null; + DataSet<Tuple2<K, VV>> vertices; if (edgeReader == null) { throw new RuntimeException("The edges input file cannot be null!"); @@ -189,7 +189,7 @@ public class GraphCsvReader { .map(new MapFunction<Tuple2<K,K>, Tuple3<K, K, NullValue>>() { public Tuple3<K, K, NullValue> map(Tuple2<K, K> input) { - return new Tuple3<K, K, NullValue>(input.f0, input.f1, NullValue.getInstance()); + return new Tuple3<>(input.f0, input.f1, NullValue.getInstance()); } }).withForwardedFields("f0;f1"); http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java index 0b98a27..964d20e 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java @@ -37,7 +37,7 @@ public abstract class IterationConfiguration { private int parallelism = -1; /** the iteration aggregators **/ - private Map<String, Aggregator<?>> aggregators = new HashMap<String, Aggregator<?>>(); + private Map<String, Aggregator<?>> aggregators = new HashMap<>(); /** flag that defines whether the solution set is kept in managed memory **/ private boolean unmanagedSolutionSet = false; http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java index dee3480..2ae0903 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java @@ -64,14 +64,14 @@ public class Triplet <K, VV, EV> extends Tuple5<K, K, VV, VV, EV> { } public Vertex<K, VV> getSrcVertex() { - return new Vertex<K, VV>(this.f0, this.f2); + return new Vertex<>(this.f0, this.f2); } public Vertex<K, VV> getTrgVertex() { - return new Vertex<K, VV>(this.f1, this.f3); + return new Vertex<>(this.f1, this.f3); } public Edge<K, EV> getEdge() { - return new Edge<K, EV>(this.f0, this.f1, this.f4); + return new Edge<>(this.f0, this.f1, this.f4); } } http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java index a30d1a2..f40dac9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java @@ -18,9 +18,10 @@ package org.apache.flink.graph; -import java.io.Serializable; - import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.java.DataSet; + +import java.io.Serializable; /** * Interface to be implemented by the transformation function http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java index e9add7c..f05c254 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java @@ -105,7 +105,7 @@ public abstract class ApplyFunction<K, VV, M> implements Serializable { * @return The aggregator registered under this name, or null, if no aggregator was registered. */ public <T extends Aggregator<?>> T getIterationAggregator(String name) { - return this.runtimeContext.<T>getIterationAggregator(name); + return this.runtimeContext.getIterationAggregator(name); } /** @@ -115,7 +115,7 @@ public abstract class ApplyFunction<K, VV, M> implements Serializable { * @return The aggregated value of the previous iteration. */ public <T extends Value> T getPreviousIterationAggregate(String name) { - return this.runtimeContext.<T>getPreviousIterationAggregate(name); + return this.runtimeContext.getPreviousIterationAggregate(name); } /** @@ -126,7 +126,7 @@ public abstract class ApplyFunction<K, VV, M> implements Serializable { * @return The broadcast data set. */ public <T> Collection<T> getBroadcastSet(String name) { - return this.runtimeContext.<T>getBroadcastVariable(name); + return this.runtimeContext.getBroadcastVariable(name); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java index 8d24f16..079b4c7 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GSAConfiguration.java @@ -39,13 +39,13 @@ import java.util.List; public class GSAConfiguration extends IterationConfiguration { /** the broadcast variables for the gather function **/ - private List<Tuple2<String, DataSet<?>>> bcVarsGather = new ArrayList<Tuple2<String,DataSet<?>>>(); + private List<Tuple2<String, DataSet<?>>> bcVarsGather = new ArrayList<>(); /** the broadcast variables for the sum function **/ - private List<Tuple2<String, DataSet<?>>> bcVarsSum = new ArrayList<Tuple2<String,DataSet<?>>>(); + private List<Tuple2<String, DataSet<?>>> bcVarsSum = new ArrayList<>(); /** the broadcast variables for the apply function **/ - private List<Tuple2<String, DataSet<?>>> bcVarsApply = new ArrayList<Tuple2<String,DataSet<?>>>(); + private List<Tuple2<String, DataSet<?>>> bcVarsApply = new ArrayList<>(); private EdgeDirection direction = EdgeDirection.OUT; http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java index d914f2a..90db9da 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java @@ -94,7 +94,7 @@ public abstract class GatherFunction<VV, EV, M> implements Serializable { * @return The aggregator registered under this name, or null, if no aggregator was registered. */ public <T extends Aggregator<?>> T getIterationAggregator(String name) { - return this.runtimeContext.<T>getIterationAggregator(name); + return this.runtimeContext.getIterationAggregator(name); } /** @@ -104,7 +104,7 @@ public abstract class GatherFunction<VV, EV, M> implements Serializable { * @return The aggregated value of the previous iteration. */ public <T extends Value> T getPreviousIterationAggregate(String name) { - return this.runtimeContext.<T>getPreviousIterationAggregate(name); + return this.runtimeContext.getPreviousIterationAggregate(name); } /** @@ -115,7 +115,7 @@ public abstract class GatherFunction<VV, EV, M> implements Serializable { * @return The broadcast data set. */ public <T> Collection<T> getBroadcastSet(String name) { - return this.runtimeContext.<T>getBroadcastVariable(name); + return this.runtimeContext.getBroadcastVariable(name); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java index d1b12f9..442ce68 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java @@ -39,7 +39,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.Edge; import org.apache.flink.graph.EdgeDirection; -import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.utils.GraphUtils; import org.apache.flink.types.LongValue; @@ -119,13 +118,9 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati // Prepare type information TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertexDataSet.getType()).getTypeAt(0); TypeInformation<M> messageType = TypeExtractor.createTypeInfo(gather, GatherFunction.class, gather.getClass(), 2); - TypeInformation<Tuple2<K, M>> innerType = new TupleTypeInfo<Tuple2<K, M>>(keyType, messageType); + TypeInformation<Tuple2<K, M>> innerType = new TupleTypeInfo<>(keyType, messageType); TypeInformation<Vertex<K, VV>> outputType = vertexDataSet.getType(); - // create a graph - Graph<K, VV, EV> graph = - Graph.fromDataSet(vertexDataSet, edgeDataSet, vertexDataSet.getExecutionEnvironment()); - // check whether the numVertices option is set and, if so, compute the total number of vertices // and set it within the gather, sum and apply functions @@ -139,9 +134,9 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati } // Prepare UDFs - GatherUdf<K, VV, EV, M> gatherUdf = new GatherUdf<K, VV, EV, M>(gather, innerType); - SumUdf<K, VV, EV, M> sumUdf = new SumUdf<K, VV, EV, M>(sum, innerType); - ApplyUdf<K, VV, EV, M> applyUdf = new ApplyUdf<K, VV, EV, M>(apply, outputType); + GatherUdf<K, VV, EV, M> gatherUdf = new GatherUdf<>(gather, innerType); + SumUdf<K, VV, EV, M> sumUdf = new SumUdf<>(sum, innerType); + ApplyUdf<K, VV, EV, M> applyUdf = new ApplyUdf<>(apply, outputType); final int[] zeroKeyPos = new int[] {0}; final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration = @@ -268,11 +263,11 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati * * @return An in stance of the gather-sum-apply graph computation operator. */ - public static final <K, VV, EV, M> GatherSumApplyIteration<K, VV, EV, M> + public static <K, VV, EV, M> GatherSumApplyIteration<K, VV, EV, M> withEdges(DataSet<Edge<K, EV>> edges, GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply, int maximumNumberOfIterations) { - return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations); + return new GatherSumApplyIteration<>(gather, sum, apply, edges, maximumNumberOfIterations); } // -------------------------------------------------------------------------------------------- @@ -295,7 +290,7 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati @Override public Tuple2<K, M> map(Tuple2<K, Neighbor<VV, EV>> neighborTuple) { M result = this.gatherFunction.gather(neighborTuple.f1); - return new Tuple2<K, M>(neighborTuple.f0, result); + return new Tuple2<>(neighborTuple.f0, result); } @Override @@ -337,7 +332,7 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati public Tuple2<K, M> reduce(Tuple2<K, M> arg0, Tuple2<K, M> arg1) throws Exception { K key = arg0.f0; M result = this.sumFunction.sum(arg0.f1, arg1.f1); - return new Tuple2<K, M>(key, result); + return new Tuple2<>(key, result); } @Override @@ -411,8 +406,8 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> { public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) { - out.collect(new Tuple2<K, Neighbor<VV, EV>>( - edge.getTarget(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue()))); + out.collect(new Tuple2<>( + edge.getTarget(), new Neighbor<>(vertex.getValue(), edge.getValue()))); } } @@ -422,8 +417,8 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> { public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) { - out.collect(new Tuple2<K, Neighbor<VV, EV>>( - edge.getSource(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue()))); + out.collect(new Tuple2<>( + edge.getSource(), new Neighbor<>(vertex.getValue(), edge.getValue()))); } } http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java index 68e8d27..e70af1f 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java @@ -94,7 +94,7 @@ public abstract class SumFunction<VV, EV, M> implements Serializable { * @return The aggregator registered under this name, or null, if no aggregator was registered. */ public <T extends Aggregator<?>> T getIterationAggregator(String name) { - return this.runtimeContext.<T>getIterationAggregator(name); + return this.runtimeContext.getIterationAggregator(name); } /** @@ -104,7 +104,7 @@ public abstract class SumFunction<VV, EV, M> implements Serializable { * @return The aggregated value of the previous iteration. */ public <T extends Value> T getPreviousIterationAggregate(String name) { - return this.runtimeContext.<T>getPreviousIterationAggregate(name); + return this.runtimeContext.getPreviousIterationAggregate(name); } /** @@ -115,7 +115,7 @@ public abstract class SumFunction<VV, EV, M> implements Serializable { * @return The broadcast data set. */ public <T> Collection<T> getBroadcastSet(String name) { - return this.runtimeContext.<T>getBroadcastVariable(name); + return this.runtimeContext.getBroadcastVariable(name); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java index f39d858..4656757 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java @@ -51,7 +51,7 @@ public class GSASingleSourceShortestPaths<K> implements @Override public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) { - return input.mapVertices(new InitVerticesMapper<K>(srcVertexId)) + return input.mapVertices(new InitVerticesMapper<>(srcVertexId)) .runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(), new UpdateDistance<K>(), maxIterations) .getVertices(); @@ -85,7 +85,7 @@ public class GSASingleSourceShortestPaths<K> implements public Double gather(Neighbor<Double, Double> neighbor) { return neighbor.getNeighborValue() + neighbor.getEdgeValue(); } - }; + } @SuppressWarnings("serial") private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> { @@ -93,7 +93,7 @@ public class GSASingleSourceShortestPaths<K> implements public Double sum(Double newValue, Double currentValue) { return Math.min(newValue, currentValue); } - }; + } @SuppressWarnings("serial") private static final class UpdateDistance<K> extends ApplyFunction<K, Double, Double> { http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java index 2d13dfd..96e5afc 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java @@ -112,7 +112,7 @@ public class LabelPropagation<K, VV extends Comparable<VV>, EV> public static final class UpdateVertexLabel<K, VV extends Comparable<VV>> extends GatherFunction<K, VV, VV> { public void updateVertex(Vertex<K, VV> vertex, MessageIterator<VV> inMessages) { - Map<VV, Long> labelsWithFrequencies = new HashMap<VV, Long>(); + Map<VV, Long> labelsWithFrequencies = new HashMap<>(); long maxFrequency = 1; VV mostFrequentLabel = vertex.getValue(); http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java index 4ff4e79..5aa1ac1 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java @@ -51,7 +51,7 @@ public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, D @Override public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) { - return input.mapVertices(new InitVerticesMapper<K>(srcVertexId)) + return input.mapVertices(new InitVerticesMapper<>(srcVertexId)) .runScatterGatherIteration(new MinDistanceMessenger<K>(), new VertexDistanceUpdater<K>(), maxIterations).getVertices(); } http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java index 681d060..dabeb06 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java @@ -110,7 +110,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements private static final class DegreeCounter<K extends Comparable<K>, EV> implements GroupReduceFunction<Edge<K, EV>, EdgeWithDegrees<K>> { - final ArrayList<K> otherVertices = new ArrayList<K>(); + final ArrayList<K> otherVertices = new ArrayList<>(); final EdgeWithDegrees<K> outputEdge = new EdgeWithDegrees<>(); @Override http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java index db64f63..08c15e9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java @@ -18,10 +18,6 @@ package org.apache.flink.graph.pregel; -import java.io.Serializable; -import java.util.Collection; -import java.util.Iterator; - import org.apache.flink.api.common.aggregators.Aggregator; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.java.DataSet; @@ -33,6 +29,10 @@ import org.apache.flink.types.Either; import org.apache.flink.types.Value; import org.apache.flink.util.Collector; +import java.io.Serializable; +import java.util.Collection; +import java.util.Iterator; + /** * The base class for the message-passing functions between vertices as a part of a {@link VertexCentricIteration}. * @@ -86,7 +86,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl */ public final Iterable<Edge<K, EV>> getEdges() { verifyEdgeUsage(); - this.edgeIterator.set((Iterator<Edge<K, EV>>) edges); + this.edgeIterator.set(edges); return this.edgeIterator; } @@ -100,7 +100,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl verifyEdgeUsage(); outMsg.setField(m, 1); while (edges.hasNext()) { - Tuple next = (Tuple) edges.next(); + Tuple next = edges.next(); outMsg.setField(next.getField(1), 0); out.collect(Either.Right(outMsg)); } @@ -157,7 +157,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl * @return The aggregator registered under this name, or {@code null}, if no aggregator was registered. */ public final <T extends Aggregator<?>> T getIterationAggregator(String name) { - return this.runtimeContext.<T>getIterationAggregator(name); + return this.runtimeContext.getIterationAggregator(name); } /** @@ -167,7 +167,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl * @return The aggregated value of the previous iteration. */ public final <T extends Value> T getPreviousIterationAggregate(String name) { - return this.runtimeContext.<T>getPreviousIterationAggregate(name); + return this.runtimeContext.getPreviousIterationAggregate(name); } /** @@ -179,7 +179,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl * @return The broadcast data set. */ public final <T> Collection<T> getBroadcastSet(String name) { - return this.runtimeContext.<T>getBroadcastVariable(name); + return this.runtimeContext.getBroadcastVariable(name); } // -------------------------------------------------------------------------------------------- @@ -204,9 +204,9 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl void init(IterationRuntimeContext context) { this.runtimeContext = context; - this.outVertex = new Vertex<K, VV>(); - this.outMsg = new Tuple2<K, Message>(); - this.edgeIterator = new EdgesIterator<K, EV>(); + this.outVertex = new Vertex<>(); + this.outMsg = new Tuple2<>(); + this.edgeIterator = new EdgesIterator<>(); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java index 70c8262..9398d8d 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java @@ -18,15 +18,15 @@ package org.apache.flink.graph.pregel; -import java.io.Serializable; - import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.types.Either; import org.apache.flink.types.NullValue; import org.apache.flink.util.Collector; +import java.io.Serializable; + /** - * The base class for combining messages sent during a {@link VertexCentricteration}. + * The base class for combining messages sent during a {@link VertexCentricIteration}. * * @param <K> The type of the vertex id * @param <Message> The type of the message sent between vertices along the edges. @@ -37,15 +37,12 @@ public abstract class MessageCombiner<K, Message> implements Serializable { private Collector<Tuple2<K, Either<NullValue, Message>>> out; - private K vertexId; - private Tuple2<K, Either<NullValue, Message>> outValue; void set(K target, Collector<Tuple2<K, Either<NullValue, Message>>> collector) { - this.vertexId = target; this.out = collector; - this.outValue = new Tuple2<K, Either<NullValue, Message>>(); - outValue.setField(vertexId, 0); + this.outValue = new Tuple2<>(); + outValue.setField(target, 0); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java index 026e869..a0f793a 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java @@ -37,7 +37,7 @@ import java.util.List; public class VertexCentricConfiguration extends IterationConfiguration { /** the broadcast variables for the compute function **/ - private List<Tuple2<String, DataSet<?>>> bcVars = new ArrayList<Tuple2<String,DataSet<?>>>(); + private List<Tuple2<String, DataSet<?>>> bcVars = new ArrayList<>(); public VertexCentricConfiguration() {} http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java index 8d96776..ebf2e8d 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricIteration.java @@ -18,24 +18,21 @@ package org.apache.flink.graph.pregel; -import java.util.Iterator; -import java.util.Map; - import org.apache.flink.api.common.aggregators.Aggregator; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupCombineFunction; import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.functions.RichCoGroupFunction; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.functions.RichCoGroupFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; import org.apache.flink.api.java.operators.CoGroupOperator; -import org.apache.flink.api.java.operators.DeltaIteration; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.operators.CustomUnaryOperation; +import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.EitherTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; @@ -50,6 +47,9 @@ import org.apache.flink.types.NullValue; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; +import java.util.Iterator; +import java.util.Map; + /** * This class represents iterative graph computations, programmed in a vertex-centric perspective. * It is a special case of <i>Bulk Synchronous Parallel</i> computation. The paradigm has also been @@ -158,14 +158,14 @@ public class VertexCentricIteration<K, VV, EV, Message> // prepare the type information TypeInformation<K> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0); TypeInformation<Tuple2<K, Message>> messageTypeInfo = - new TupleTypeInfo<Tuple2<K, Message>>(keyType, messageType); + new TupleTypeInfo<>(keyType, messageType); TypeInformation<Vertex<K, VV>> vertexType = initialVertices.getType(); TypeInformation<Either<Vertex<K, VV>, Tuple2<K, Message>>> intermediateTypeInfo = - new EitherTypeInfo<Vertex<K, VV>, Tuple2<K, Message>>(vertexType, messageTypeInfo); + new EitherTypeInfo<>(vertexType, messageTypeInfo); TypeInformation<Either<NullValue, Message>> nullableMsgTypeInfo = - new EitherTypeInfo<NullValue, Message>(TypeExtractor.getForClass(NullValue.class), messageType); + new EitherTypeInfo<>(TypeExtractor.getForClass(NullValue.class), messageType); TypeInformation<Tuple2<K, Either<NullValue, Message>>> workSetTypeInfo = - new TupleTypeInfo<Tuple2<K, Either<NullValue, Message>>>(keyType, nullableMsgTypeInfo); + new TupleTypeInfo<>(keyType, nullableMsgTypeInfo); DataSet<Tuple2<K, Either<NullValue, Message>>> initialWorkSet = initialVertices.map( new InitializeWorkSet<K, VV, Message>()).returns(workSetTypeInfo); @@ -183,7 +183,7 @@ public class VertexCentricIteration<K, VV, EV, Message> vertexType, nullableMsgTypeInfo)); VertexComputeUdf<K, VV, EV, Message> vertexUdf = - new VertexComputeUdf<K, VV, EV, Message>(computeFunction, intermediateTypeInfo); + new VertexComputeUdf<>(computeFunction, intermediateTypeInfo); CoGroupOperator<?, ?, Either<Vertex<K, VV>, Tuple2<K, Message>>> superstepComputation = verticesWithMsgs.coGroup(edgesWithValue) @@ -204,7 +204,7 @@ public class VertexCentricIteration<K, VV, EV, Message> if (combineFunction != null) { MessageCombinerUdf<K, Message> combinerUdf = - new MessageCombinerUdf<K, Message>(combineFunction, workSetTypeInfo); + new MessageCombinerUdf<>(combineFunction, workSetTypeInfo); DataSet<Tuple2<K, Either<NullValue, Message>>> combinedMessages = allMessages .groupBy(0).reduceGroup(combinerUdf) @@ -237,12 +237,12 @@ public class VertexCentricIteration<K, VV, EV, Message> * * @return An instance of the vertex-centric graph computation operator. */ - public static final <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges( + public static <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges( DataSet<Edge<K, EV>> edgesWithValue, ComputeFunction<K, VV, EV, Message> cf, int maximumNumberOfIterations) { - return new VertexCentricIteration<K, VV, EV, Message>(cf, edgesWithValue, null, - maximumNumberOfIterations); + return new VertexCentricIteration<>(cf, edgesWithValue, null, + maximumNumberOfIterations); } /** @@ -260,12 +260,12 @@ public class VertexCentricIteration<K, VV, EV, Message> * * @return An instance of the vertex-centric graph computation operator. */ - public static final <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges( + public static <K, VV, EV, Message> VertexCentricIteration<K, VV, EV, Message> withEdges( DataSet<Edge<K, EV>> edgesWithValue, ComputeFunction<K, VV, EV, Message> cf, MessageCombiner<K, Message> mc, int maximumNumberOfIterations) { - return new VertexCentricIteration<K, VV, EV, Message>(cf, edgesWithValue, mc, - maximumNumberOfIterations); + return new VertexCentricIteration<>(cf, edgesWithValue, mc, + maximumNumberOfIterations); } /** @@ -297,7 +297,7 @@ public class VertexCentricIteration<K, VV, EV, Message> @Override public void open(Configuration parameters) { - outTuple = new Tuple2<K, Either<NullValue, Message>>(); + outTuple = new Tuple2<>(); nullMessage = Either.Left(NullValue.getInstance()); outTuple.setField(nullMessage, 1); } @@ -308,12 +308,12 @@ public class VertexCentricIteration<K, VV, EV, Message> } } - @SuppressWarnings("serial") /** * This coGroup class wraps the user-defined compute function. * The first input holds a Tuple2 containing the vertex state and its inbox. * The second input is an iterator of the out-going edges of this vertex. */ + @SuppressWarnings("serial") private static class VertexComputeUdf<K, VV, EV, Message> extends RichCoGroupFunction< Tuple2<Vertex<K, VV>, Either<NullValue, Message>>, Edge<K, EV>, Either<Vertex<K, VV>, Tuple2<K, Message>>> @@ -469,8 +469,7 @@ public class VertexCentricIteration<K, VV, EV, Message> JoinFunction<Vertex<K, VV>, Tuple2<K, Either<NullValue, Message>>, Tuple2<Vertex<K, VV>, Either<NullValue, Message>>> { - private Tuple2<Vertex<K, VV>, Either<NullValue, Message>> outTuple = - new Tuple2<Vertex<K, VV>, Either<NullValue, Message>>(); + private Tuple2<Vertex<K, VV>, Either<NullValue, Message>> outTuple = new Tuple2<>(); public Tuple2<Vertex<K, VV>, Either<NullValue, Message>> join( Vertex<K, VV> vertex, Tuple2<K, Either<NullValue, Message>> message) { @@ -498,7 +497,7 @@ public class VertexCentricIteration<K, VV, EV, Message> private static final class ProjectMessages<K, VV, Message> implements FlatMapFunction<Either<Vertex<K, VV>, Tuple2<K, Message>>, Tuple2<K, Either<NullValue, Message>>> { - private Tuple2<K, Either<NullValue, Message>> outTuple = new Tuple2<K, Either<NullValue, Message>>(); + private Tuple2<K, Either<NullValue, Message>> outTuple = new Tuple2<>(); public void flatMap(Either<Vertex<K, VV>, Tuple2<K, Message>> value, Collector<Tuple2<K, Either<NullValue, Message>>> out) { http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java index d56c0da..93b3a8c 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/GatherFunction.java @@ -141,7 +141,7 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable { * @return The aggregator registered under this name, or null, if no aggregator was registered. */ public <T extends Aggregator<?>> T getIterationAggregator(String name) { - return this.runtimeContext.<T>getIterationAggregator(name); + return this.runtimeContext.getIterationAggregator(name); } /** @@ -151,7 +151,7 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable { * @return The aggregated value of the previous iteration. */ public <T extends Value> T getPreviousIterationAggregate(String name) { - return this.runtimeContext.<T>getPreviousIterationAggregate(name); + return this.runtimeContext.getPreviousIterationAggregate(name); } /** @@ -163,7 +163,7 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable { * @return The broadcast data set. */ public <T> Collection<T> getBroadcastSet(String name) { - return this.runtimeContext.<T>getBroadcastVariable(name); + return this.runtimeContext.getBroadcastVariable(name); } // -------------------------------------------------------------------------------------------- @@ -243,8 +243,8 @@ public abstract class GatherFunction<K, VV, Message> implements Serializable { <VertexWithDegree> void updateVertexFromScatterGatherIteration(Vertex<K, VertexWithDegree> vertexState, MessageIterator<Message> inMessages) throws Exception { - Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0, - ((Tuple3<VV, Long, Long>)vertexState.getValue()).f0); + Vertex<K, VV> vertex = new Vertex<>(vertexState.f0, + ((Tuple3<VV, Long, Long>) vertexState.getValue()).f0); updateVertex(vertex, inMessages); } http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java index 336e73d..b99b5b7 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterFunction.java @@ -217,7 +217,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl * @return The aggregator registered under this name, or null, if no aggregator was registered. */ public <T extends Aggregator<?>> T getIterationAggregator(String name) { - return this.runtimeContext.<T>getIterationAggregator(name); + return this.runtimeContext.getIterationAggregator(name); } /** @@ -227,7 +227,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl * @return The aggregated value of the previous iteration. */ public <T extends Value> T getPreviousIterationAggregate(String name) { - return this.runtimeContext.<T>getPreviousIterationAggregate(name); + return this.runtimeContext.getPreviousIterationAggregate(name); } /** @@ -239,7 +239,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl * @return The broadcast data set. */ public <T> Collection<T> getBroadcastSet(String name) { - return this.runtimeContext.<T>getBroadcastVariable(name); + return this.runtimeContext.getBroadcastVariable(name); } // -------------------------------------------------------------------------------------------- @@ -266,8 +266,8 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl void init(IterationRuntimeContext context) { this.runtimeContext = context; - this.outValue = new Tuple2<K, Message>(); - this.edgeIterator = new EdgesIterator<K, EV>(); + this.outValue = new Tuple2<>(); + this.edgeIterator = new EdgesIterator<>(); } void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out, K id) { @@ -282,7 +282,7 @@ public abstract class ScatterFunction<K, VV, Message, EV> implements Serializabl { private Iterator<Edge<K, EV>> input; - private Edge<K, EV> edge = new Edge<K, EV>(); + private Edge<K, EV> edge = new Edge<>(); void set(Iterator<Edge<K, EV>> input) { this.input = input; http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java index fde305f..8049932 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java @@ -196,7 +196,7 @@ public class ScatterGatherIteration<K, VV, Message, EV> * * @return An in stance of the scatter-gather graph computation operator. */ - public static final <K, VV, Message, EV> ScatterGatherIteration<K, VV, Message, EV> withEdges( + public static <K, VV, Message, EV> ScatterGatherIteration<K, VV, Message, EV> withEdges( DataSet<Edge<K, EV>> edgesWithValue, ScatterFunction<K, VV, Message, EV> sf, GatherFunction<K, VV, Message> gf, int maximumNumberOfIterations) { @@ -588,8 +588,7 @@ public class ScatterGatherIteration<K, VV, Message, EV> throw new IllegalArgumentException("Illegal edge direction"); } - GatherUdf<K, VV, Message> updateUdf = - new GatherUdfSimpleVV<K, VV, Message>(gatherFunction, vertexTypes); + GatherUdf<K, VV, Message> updateUdf = new GatherUdfSimpleVV<>(gatherFunction, vertexTypes); // build the update function (co group) CoGroupOperator<?, ?, Vertex<K, VV>> updates = http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java index 33d469b..b620dd8 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java @@ -50,8 +50,8 @@ public class InvalidVertexIdsValidator<K, VV, EV> extends GraphValidator<K, VV, private static final class MapEdgeIds<K, EV> implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> { public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) { - out.collect(new Tuple1<K>(edge.f0)); - out.collect(new Tuple1<K>(edge.f1)); + out.collect(new Tuple1<>(edge.f0)); + out.collect(new Tuple1<>(edge.f1)); } } @@ -67,7 +67,7 @@ public class InvalidVertexIdsValidator<K, VV, EV> extends GraphValidator<K, VV, private static final class KToTupleMap<K> implements MapFunction<K, Tuple1<K>> { public Tuple1<K> map(K key) throws Exception { - return new Tuple1<K>(key); + return new Tuple1<>(key); } } http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java index 0cf026c..ba6bd05 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/TranslateTest.java @@ -33,7 +33,6 @@ import org.junit.Test; import java.util.LinkedList; import java.util.List; -import static org.apache.flink.graph.asm.translate.Translate.translateVertexValues; import static org.junit.Assert.assertEquals; public class TranslateTest { http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java index 1e44d5b..98ecf16 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.aggregators.LongSumAggregator; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.graph.Edge; import org.apache.flink.graph.EdgeDirection; import org.apache.flink.graph.Graph; @@ -240,10 +239,10 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase // test bcast variable @SuppressWarnings("unchecked") - List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("gatherBcastSet"); - Assert.assertEquals(1, bcastSet.get(0)); - Assert.assertEquals(2, bcastSet.get(1)); - Assert.assertEquals(3, bcastSet.get(2)); + List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("gatherBcastSet"); + Assert.assertEquals(1, bcastSet.get(0).intValue()); + Assert.assertEquals(2, bcastSet.get(1).intValue()); + Assert.assertEquals(3, bcastSet.get(2).intValue()); // test aggregator if (getSuperstepNumber() == 2) { @@ -271,10 +270,10 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase // test bcast variable @SuppressWarnings("unchecked") - List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("sumBcastSet"); - Assert.assertEquals(4, bcastSet.get(0)); - Assert.assertEquals(5, bcastSet.get(1)); - Assert.assertEquals(6, bcastSet.get(2)); + List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("sumBcastSet"); + Assert.assertEquals(4, bcastSet.get(0).intValue()); + Assert.assertEquals(5, bcastSet.get(1).intValue()); + Assert.assertEquals(6, bcastSet.get(2).intValue()); // test aggregator aggregator = getIterationAggregator("superstepAggregator"); @@ -286,7 +285,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase public Long sum(Long newValue, Long currentValue) { long superstep = getSuperstepNumber(); aggregator.aggregate(superstep); - return 0l; + return 0L; } } @@ -300,10 +299,10 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase // test bcast variable @SuppressWarnings("unchecked") - List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("applyBcastSet"); - Assert.assertEquals(7, bcastSet.get(0)); - Assert.assertEquals(8, bcastSet.get(1)); - Assert.assertEquals(9, bcastSet.get(2)); + List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("applyBcastSet"); + Assert.assertEquals(7, bcastSet.get(0).intValue()); + Assert.assertEquals(8, bcastSet.get(1).intValue()); + Assert.assertEquals(9, bcastSet.get(2).intValue()); // test aggregator aggregator = getIterationAggregator("superstepAggregator"); @@ -338,7 +337,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase private static final class DummySum extends SumFunction<Long, Long, Long> { public Long sum(Long newValue, Long currentValue) { - return 0l; + return 0L; } } @@ -354,7 +353,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> { public Long map(Vertex<Long, Long> value) { - return 1l; + return 1L; } } @@ -363,7 +362,7 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase @Override public HashSet<Long> map(Vertex<Long, Long> value) throws Exception { - HashSet<Long> h = new HashSet<Long>(); + HashSet<Long> h = new HashSet<>(); h.add(value.getId()); return h; } http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java index fcd0d82..2213700 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/ScatterGatherConfigurationITCase.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.aggregators.LongSumAggregator; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.graph.Edge; import org.apache.flink.graph.EdgeDirection; @@ -521,10 +520,10 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { // test bcast variable @SuppressWarnings("unchecked") - List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("messagingBcastSet"); - Assert.assertEquals(4, bcastSet.get(0)); - Assert.assertEquals(5, bcastSet.get(1)); - Assert.assertEquals(6, bcastSet.get(2)); + List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("messagingBcastSet"); + Assert.assertEquals(4, bcastSet.get(0).intValue()); + Assert.assertEquals(5, bcastSet.get(1).intValue()); + Assert.assertEquals(6, bcastSet.get(2).intValue()); // test number of vertices Assert.assertEquals(5, getNumberOfVertices()); @@ -569,10 +568,10 @@ public class ScatterGatherConfigurationITCase extends MultipleProgramsTestBase { // test bcast variable @SuppressWarnings("unchecked") - List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("updateBcastSet"); - Assert.assertEquals(1, bcastSet.get(0)); - Assert.assertEquals(2, bcastSet.get(1)); - Assert.assertEquals(3, bcastSet.get(2)); + List<Integer> bcastSet = (List<Integer>)(List<?>)getBroadcastSet("updateBcastSet"); + Assert.assertEquals(1, bcastSet.get(0).intValue()); + Assert.assertEquals(2, bcastSet.get(1).intValue()); + Assert.assertEquals(3, bcastSet.get(2).intValue()); // test aggregator aggregator = getIterationAggregator("superstepAggregator"); http://git-wip-us.apache.org/repos/asf/flink/blob/bb34133e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java index 99c66ec..3ccdef0 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.graph.test.operations; -import com.google.common.base.Charsets; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.core.fs.FileInputSplit; @@ -30,10 +29,12 @@ import org.apache.flink.types.NullValue; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; + import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; +import java.nio.charset.Charset; import java.util.List; @RunWith(Parameterized.class) @@ -193,7 +194,7 @@ public class GraphCreationWithCsvITCase extends MultipleProgramsTestBase { tempFile.deleteOnExit(); OutputStreamWriter wrt = new OutputStreamWriter( - new FileOutputStream(tempFile), Charsets.UTF_8 + new FileOutputStream(tempFile), Charset.forName("UTF-8") ); wrt.write(content); wrt.close();
