http://git-wip-us.apache.org/repos/asf/flink/blob/cb282067/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java index 84d7722..b26bb43 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java @@ -41,11 +41,11 @@ import java.util.Objects; @RunWith(Parameterized.class) public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { - public ReduceOnEdgesMethodsITCase(TestExecutionMode mode){ + public ReduceOnEdgesMethodsITCase(TestExecutionMode mode) { super(mode); } - private String expectedResult; + private String expectedResult; @Test public void testLowestWeightOutNeighbor() throws Exception { @@ -54,20 +54,19 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { * for each vertex */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); - DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.groupReduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT); + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.groupReduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT); List<Tuple2<Long, Long>> result = verticesWithLowestOutNeighbor.collect(); - expectedResult = "1,2\n" + - "2,3\n" + - "3,4\n" + - "4,5\n" + - "5,1\n"; - + "2,3\n" + + "3,4\n" + + "4,5\n" + + "5,1\n"; + compareResultAsTuples(result, expectedResult); } @@ -78,19 +77,19 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { * for each vertex */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); - DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.groupReduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN); + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.groupReduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN); List<Tuple2<Long, Long>> result = verticesWithLowestOutNeighbor.collect(); expectedResult = "1,5\n" + - "2,1\n" + - "3,1\n" + - "4,3\n" + - "5,3\n"; - + "2,1\n" + + "3,1\n" + + "4,3\n" + + "5,3\n"; + compareResultAsTuples(result, expectedResult); } @@ -101,20 +100,20 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithAllOutNeighbors = - graph.groupReduceOnEdges(new SelectOutNeighbors(), EdgeDirection.OUT); + graph.groupReduceOnEdges(new SelectOutNeighbors(), EdgeDirection.OUT); List<Tuple2<Long, Long>> result = verticesWithAllOutNeighbors.collect(); expectedResult = "1,2\n" + - "1,3\n" + - "2,3\n" + - "3,4\n" + - "3,5\n" + - "4,5\n" + - "5,1"; - + "1,3\n" + + "2,3\n" + + "3,4\n" + + "3,5\n" + + "4,5\n" + + "5,1"; + compareResultAsTuples(result, expectedResult); } @@ -125,19 +124,19 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithAllOutNeighbors = - graph.groupReduceOnEdges(new SelectOutNeighborsExcludeFive(), EdgeDirection.OUT); + graph.groupReduceOnEdges(new SelectOutNeighborsExcludeFive(), EdgeDirection.OUT); List<Tuple2<Long, Long>> result = verticesWithAllOutNeighbors.collect(); expectedResult = "1,2\n" + - "1,3\n" + - "2,3\n" + - "3,4\n" + - "3,5\n" + - "4,5"; - + "1,3\n" + + "2,3\n" + + "3,4\n" + + "3,5\n" + + "4,5"; + compareResultAsTuples(result, expectedResult); } @@ -148,17 +147,17 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithAllOutNeighbors = - graph.groupReduceOnEdges(new SelectOutNeighborsValueGreaterThanTwo(), EdgeDirection.OUT); + graph.groupReduceOnEdges(new SelectOutNeighborsValueGreaterThanTwo(), EdgeDirection.OUT); List<Tuple2<Long, Long>> result = verticesWithAllOutNeighbors.collect(); expectedResult = "3,4\n" + - "3,5\n" + - "4,5\n" + - "5,1"; - + "3,5\n" + + "4,5\n" + + "5,1"; + compareResultAsTuples(result, expectedResult); } @@ -169,20 +168,20 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors = - graph.groupReduceOnEdges(new SelectInNeighbors(), EdgeDirection.IN); + graph.groupReduceOnEdges(new SelectInNeighbors(), EdgeDirection.IN); List<Tuple2<Long, Long>> result = verticesWithAllInNeighbors.collect(); expectedResult = "1,5\n" + - "2,1\n" + - "3,1\n" + - "3,2\n" + - "4,3\n" + - "5,3\n" + - "5,4"; - + "2,1\n" + + "3,1\n" + + "3,2\n" + + "4,3\n" + + "5,3\n" + + "5,4"; + compareResultAsTuples(result, expectedResult); } @@ -193,18 +192,18 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors = - graph.groupReduceOnEdges(new SelectInNeighborsExceptFive(), EdgeDirection.IN); + graph.groupReduceOnEdges(new SelectInNeighborsExceptFive(), EdgeDirection.IN); List<Tuple2<Long, Long>> result = verticesWithAllInNeighbors.collect(); expectedResult = "1,5\n" + - "2,1\n" + - "3,1\n" + - "3,2\n" + - "4,3"; - + "2,1\n" + + "3,1\n" + + "3,2\n" + + "4,3"; + compareResultAsTuples(result, expectedResult); } @@ -215,18 +214,18 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors = - graph.groupReduceOnEdges(new SelectInNeighborsValueGreaterThanTwo(), EdgeDirection.IN); + graph.groupReduceOnEdges(new SelectInNeighborsValueGreaterThanTwo(), EdgeDirection.IN); List<Tuple2<Long, Long>> result = verticesWithAllInNeighbors.collect(); expectedResult = "3,1\n" + - "3,2\n" + - "4,3\n" + - "5,3\n" + - "5,4"; - + "3,2\n" + + "4,3\n" + + "5,3\n" + + "5,4"; + compareResultAsTuples(result, expectedResult); } @@ -237,27 +236,27 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors = - graph.groupReduceOnEdges(new SelectNeighbors(), EdgeDirection.ALL); + graph.groupReduceOnEdges(new SelectNeighbors(), EdgeDirection.ALL); List<Tuple2<Long, Long>> result = verticesWithAllNeighbors.collect(); expectedResult = "1,2\n" + - "1,3\n" + - "1,5\n" + - "2,1\n" + - "2,3\n" + - "3,1\n" + - "3,2\n" + - "3,4\n" + - "3,5\n" + - "4,3\n" + - "4,5\n" + - "5,1\n" + - "5,3\n" + - "5,4"; - + "1,3\n" + + "1,5\n" + + "2,1\n" + + "2,3\n" + + "3,1\n" + + "3,2\n" + + "3,4\n" + + "3,5\n" + + "4,3\n" + + "4,5\n" + + "5,1\n" + + "5,3\n" + + "5,4"; + compareResultAsTuples(result, expectedResult); } @@ -268,22 +267,22 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors = - graph.groupReduceOnEdges(new SelectNeighborsExceptFiveAndTwo(), EdgeDirection.ALL); + graph.groupReduceOnEdges(new SelectNeighborsExceptFiveAndTwo(), EdgeDirection.ALL); List<Tuple2<Long, Long>> result = verticesWithAllNeighbors.collect(); expectedResult = "1,2\n" + - "1,3\n" + - "1,5\n" + - "3,1\n" + - "3,2\n" + - "3,4\n" + - "3,5\n" + - "4,3\n" + - "4,5"; - + "1,3\n" + + "1,5\n" + + "3,1\n" + + "3,2\n" + + "3,4\n" + + "3,5\n" + + "4,3\n" + + "4,5"; + compareResultAsTuples(result, expectedResult); } @@ -294,16 +293,16 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors = - graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour(), EdgeDirection.ALL); + graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour(), EdgeDirection.ALL); List<Tuple2<Long, Long>> result = verticesWithAllNeighbors.collect(); expectedResult = "5,1\n" + - "5,3\n" + - "5,4"; - + "5,3\n" + + "5,4"; + compareResultAsTuples(result, expectedResult); } @@ -314,19 +313,19 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { * of a vertex */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); - DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = - graph.groupReduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL); + DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = + graph.groupReduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL); List<Tuple2<Long, Long>> result = verticesWithMaxEdgeWeight.collect(); expectedResult = "1,51\n" + - "2,23\n" + - "3,35\n" + - "4,45\n" + - "5,51\n"; - + "2,23\n" + + "3,35\n" + + "4,45\n" + + "5,51\n"; + compareResultAsTuples(result, expectedResult); } @@ -337,19 +336,19 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { * of each vertex */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); - DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.OUT); + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.OUT); List<Tuple2<Long, Long>> result = verticesWithLowestOutNeighbor.collect(); expectedResult = "1,12\n" + - "2,23\n" + - "3,34\n" + - "4,45\n" + - "5,51\n"; - + "2,23\n" + + "3,34\n" + + "4,45\n" + + "5,51\n"; + compareResultAsTuples(result, expectedResult); } @@ -360,19 +359,19 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { * of each vertex */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); - DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.IN); + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.IN); List<Tuple2<Long, Long>> result = verticesWithLowestOutNeighbor.collect(); expectedResult = "1,51\n" + - "2,12\n" + - "3,13\n" + - "4,34\n" + - "5,35\n"; - + "2,12\n" + + "3,13\n" + + "4,34\n" + + "5,35\n"; + compareResultAsTuples(result, expectedResult); } @@ -383,19 +382,19 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { * of a vertex */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); - DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = - graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL); + DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = + graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL); List<Tuple2<Long, Long>> result = verticesWithMaxEdgeWeight.collect(); expectedResult = "1,51\n" + - "2,23\n" + - "3,35\n" + - "4,45\n" + - "5,51\n"; - + "2,23\n" + + "3,35\n" + + "4,45\n" + + "5,51\n"; + compareResultAsTuples(result, expectedResult); } @@ -404,12 +403,12 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { @Override public void iterateEdges(Vertex<Long, Long> v, - Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception { - + Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception { + long weight = Long.MAX_VALUE; long minNeighborId = 0; - for (Edge<Long, Long> edge: edges) { + for (Edge<Long, Long> edge : edges) { if (edge.getValue() < weight) { weight = edge.getValue(); minNeighborId = edge.getTarget(); @@ -424,11 +423,11 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { @Override public void iterateEdges(Vertex<Long, Long> v, - Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception { - + Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception { + long weight = Long.MIN_VALUE; - for (Edge<Long, Long> edge: edges) { + for (Edge<Long, Long> edge : edges) { if (edge.getValue() > weight) { weight = edge.getValue(); } @@ -460,12 +459,12 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { @Override public void iterateEdges(Vertex<Long, Long> v, - Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception { - + Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception { + long weight = Long.MAX_VALUE; long minNeighborId = 0; - - for (Edge<Long, Long> edge: edges) { + + for (Edge<Long, Long> edge : edges) { if (edge.getValue() < weight) { weight = edge.getValue(); minNeighborId = edge.getSource(); @@ -480,9 +479,9 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { @Override public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges, - Collector<Tuple2<Long, Long>> out) throws Exception { + Collector<Tuple2<Long, Long>> out) throws Exception { - for(Tuple2<Long, Edge<Long, Long>> edge : edges) { + for (Tuple2<Long, Edge<Long, Long>> edge : edges) { out.collect(new Tuple2<>(edge.f0, edge.f1.getTarget())); } } @@ -493,10 +492,10 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { @Override public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges, - Collector<Tuple2<Long, Long>> out) throws Exception { + Collector<Tuple2<Long, Long>> out) throws Exception { - for(Tuple2<Long, Edge<Long, Long>> edge : edges) { - if(edge.f0 != 5) { + for (Tuple2<Long, Edge<Long, Long>> edge : edges) { + if (edge.f0 != 5) { out.collect(new Tuple2<>(edge.f0, edge.f1.getTarget())); } } @@ -505,13 +504,14 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { @SuppressWarnings("serial") private static final class SelectOutNeighborsValueGreaterThanTwo implements - EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { + EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { @Override public void iterateEdges(Vertex<Long, Long> v, Iterable<Edge<Long, Long>> edges, - Collector<Tuple2<Long, Long>> out) throws Exception { - for (Edge<Long, Long> edge: edges) { - if(v.getValue() > 2) { + Collector<Tuple2<Long, Long>> out) throws Exception { + + for (Edge<Long, Long> edge : edges) { + if (v.getValue() > 2) { out.collect(new Tuple2<>(v.getId(), edge.getTarget())); } } @@ -523,9 +523,9 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { @Override public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges, - Collector<Tuple2<Long, Long>> out) throws Exception { + Collector<Tuple2<Long, Long>> out) throws Exception { - for(Tuple2<Long, Edge<Long, Long>> edge : edges) { + for (Tuple2<Long, Edge<Long, Long>> edge : edges) { out.collect(new Tuple2<>(edge.f0, edge.f1.getSource())); } } @@ -536,10 +536,10 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { @Override public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges, - Collector<Tuple2<Long, Long>> out) throws Exception { + Collector<Tuple2<Long, Long>> out) throws Exception { - for(Tuple2<Long, Edge<Long, Long>> edge : edges) { - if(edge.f0 != 5) { + for (Tuple2<Long, Edge<Long, Long>> edge : edges) { + if (edge.f0 != 5) { out.collect(new Tuple2<>(edge.f0, edge.f1.getSource())); } } @@ -548,13 +548,14 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { @SuppressWarnings("serial") private static final class SelectInNeighborsValueGreaterThanTwo implements - EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { + EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { @Override public void iterateEdges(Vertex<Long, Long> v, Iterable<Edge<Long, Long>> edges, - Collector<Tuple2<Long, Long>> out) throws Exception { - for (Edge<Long, Long> edge: edges) { - if(v.getValue() > 2) { + Collector<Tuple2<Long, Long>> out) throws Exception { + + for (Edge<Long, Long> edge : edges) { + if (v.getValue() > 2) { out.collect(new Tuple2<>(v.getId(), edge.getSource())); } } @@ -566,7 +567,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { @Override public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges, - Collector<Tuple2<Long, Long>> out) throws Exception { + Collector<Tuple2<Long, Long>> out) throws Exception { for (Tuple2<Long, Edge<Long, Long>> edge : edges) { if (Objects.equals(edge.f0, edge.f1.getTarget())) { out.collect(new Tuple2<>(edge.f0, edge.f1.getSource())); @@ -582,9 +583,10 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { @Override public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges, - Collector<Tuple2<Long, Long>> out) throws Exception { + Collector<Tuple2<Long, Long>> out) throws Exception { + for (Tuple2<Long, Edge<Long, Long>> edge : edges) { - if(edge.f0 != 5 && edge.f0 != 2) { + if (edge.f0 != 5 && edge.f0 != 2) { if (Objects.equals(edge.f0, edge.f1.getTarget())) { out.collect(new Tuple2<>(edge.f0, edge.f1.getSource())); } else { @@ -597,14 +599,15 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { @SuppressWarnings("serial") private static final class SelectNeighborsValueGreaterThanFour implements - EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { + EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { @Override public void iterateEdges(Vertex<Long, Long> v, Iterable<Edge<Long, Long>> edges, - Collector<Tuple2<Long, Long>> out) throws Exception { - for(Edge<Long, Long> edge : edges) { - if(v.getValue() > 4) { - if(v.getId().equals(edge.getTarget())) { + Collector<Tuple2<Long, Long>> out) throws Exception { + + for (Edge<Long, Long> edge : edges) { + if (v.getValue() > 4) { + if (v.getId().equals(edge.getTarget())) { out.collect(new Tuple2<>(v.getId(), edge.getSource())); } else { out.collect(new Tuple2<>(v.getId(), edge.getTarget()));
http://git-wip-us.apache.org/repos/asf/flink/blob/cb282067/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java index a352bb4..7fad2e8 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java @@ -41,11 +41,11 @@ import java.util.List; @RunWith(Parameterized.class) public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { - public ReduceOnNeighborMethodsITCase(TestExecutionMode mode){ + public ReduceOnNeighborMethodsITCase(TestExecutionMode mode) { super(mode); } - private String expectedResult; + private String expectedResult; @Test public void testSumOfOutNeighbors() throws Exception { @@ -54,19 +54,19 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { * for each vertex */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); - DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.groupReduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT); + DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = + graph.groupReduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT); List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect(); - + expectedResult = "1,5\n" + - "2,3\n" + - "3,9\n" + - "4,5\n" + - "5,1\n"; - + "2,3\n" + + "3,9\n" + + "4,5\n" + + "5,1\n"; + compareResultAsTuples(result, expectedResult); } @@ -77,22 +77,20 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { * times the edge weights for each vertex */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); - DataSet<Tuple2<Long, Long>> verticesWithSum = - graph.groupReduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN); + DataSet<Tuple2<Long, Long>> verticesWithSum = + graph.groupReduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN); List<Tuple2<Long, Long>> result = verticesWithSum.collect(); - + expectedResult = "1,255\n" + - "2,12\n" + - "3,59\n" + - "4,102\n" + - "5,285\n"; - + "2,12\n" + + "3,59\n" + + "4,102\n" + + "5,285\n"; + compareResultAsTuples(result, expectedResult); - - } @Test @@ -103,19 +101,19 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { * for each vertex */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); - DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL); + DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = + graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL); List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect(); expectedResult = "1,11\n" + - "2,6\n" + - "3,15\n" + - "4,12\n" + - "5,13\n"; - + "2,6\n" + + "3,15\n" + + "4,12\n" + + "5,13\n"; + compareResultAsTuples(result, expectedResult); } @@ -127,15 +125,15 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.groupReduceOnNeighbors(new SumOutNeighborsIdGreaterThanThree(), EdgeDirection.OUT); + graph.groupReduceOnNeighbors(new SumOutNeighborsIdGreaterThanThree(), EdgeDirection.OUT); List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect(); - + expectedResult = "4,5\n" + - "5,1\n"; - + "5,1\n"; + compareResultAsTuples(result, expectedResult); } @@ -147,15 +145,15 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithSum = - graph.groupReduceOnNeighbors(new SumInNeighborsIdGreaterThanThree(), EdgeDirection.IN); + graph.groupReduceOnNeighbors(new SumInNeighborsIdGreaterThanThree(), EdgeDirection.IN); List<Tuple2<Long, Long>> result = verticesWithSum.collect(); - + expectedResult = "4,102\n" + - "5,285\n"; - + "5,285\n"; + compareResultAsTuples(result, expectedResult); } @@ -168,15 +166,15 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.groupReduceOnNeighbors(new SumAllNeighborsIdGreaterThanThree(), EdgeDirection.ALL); + graph.groupReduceOnNeighbors(new SumAllNeighborsIdGreaterThanThree(), EdgeDirection.ALL); List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect(); expectedResult = "4,12\n" + - "5,13\n"; - + "5,13\n"; + compareResultAsTuples(result, expectedResult); } @@ -187,19 +185,19 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { * for each vertex */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); - DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.OUT); + DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.OUT); List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect(); expectedResult = "1,5\n" + - "2,3\n" + - "3,9\n" + - "4,5\n" + - "5,1\n"; - + "2,3\n" + + "3,9\n" + + "4,5\n" + + "5,1\n"; + compareResultAsTuples(result, expectedResult); } @@ -210,19 +208,19 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { * times the edge weights for each vertex */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); - DataSet<Tuple2<Long, Long>> verticesWithSum = - graph.groupReduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN); + DataSet<Tuple2<Long, Long>> verticesWithSum = + graph.groupReduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN); List<Tuple2<Long, Long>> result = verticesWithSum.collect(); - + expectedResult = "1,255\n" + - "2,12\n" + - "3,59\n" + - "4,102\n" + - "5,285\n"; - + "2,12\n" + + "3,59\n" + + "4,102\n" + + "5,285\n"; + compareResultAsTuples(result, expectedResult); } @@ -233,19 +231,19 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { * for each vertex */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues = - graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.ALL); + graph.reduceOnNeighbors(new SumNeighbors(), EdgeDirection.ALL); List<Tuple2<Long, Long>> result = verticesWithSumOfAllNeighborValues.collect(); - + expectedResult = "1,10\n" + - "2,4\n" + - "3,12\n" + - "4,8\n" + - "5,8\n"; - + "2,4\n" + + "3,12\n" + + "4,8\n" + + "5,8\n"; + compareResultAsTuples(result, expectedResult); } @@ -257,19 +255,19 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.groupReduceOnNeighbors(new SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.OUT); + graph.groupReduceOnNeighbors(new SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.OUT); List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect(); expectedResult = "3,9\n" + - "3,18\n" + - "4,5\n" + - "4,10\n" + - "5,1\n" + - "5,2"; - + "3,18\n" + + "4,5\n" + + "4,10\n" + + "5,1\n" + + "5,2"; + compareResultAsTuples(result, expectedResult); } @@ -281,19 +279,19 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.groupReduceOnNeighbors(new SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.IN); + graph.groupReduceOnNeighbors(new SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.IN); List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect(); expectedResult = "3,59\n" + - "3,118\n" + - "4,204\n" + - "4,102\n" + - "5,570\n" + - "5,285"; - + "3,118\n" + + "4,204\n" + + "4,102\n" + + "5,570\n" + + "5,285"; + compareResultAsTuples(result, expectedResult); } @@ -305,19 +303,19 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues = - graph.groupReduceOnNeighbors(new SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.ALL); + graph.groupReduceOnNeighbors(new SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.ALL); List<Tuple2<Long, Long>> result = verticesWithSumOfAllNeighborValues.collect(); expectedResult = "3,12\n" + - "3,24\n" + - "4,8\n" + - "4,16\n" + - "5,8\n" + - "5,16"; - + "3,24\n" + + "4,8\n" + + "4,16\n" + + "5,8\n" + + "5,16"; + compareResultAsTuples(result, expectedResult); } @@ -329,23 +327,23 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.groupReduceOnNeighbors(new SumOutNeighborsMultipliedByTwo(), EdgeDirection.OUT); + graph.groupReduceOnNeighbors(new SumOutNeighborsMultipliedByTwo(), EdgeDirection.OUT); List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect(); - + expectedResult = "1,5\n" + - "1,10\n" + - "2,3\n" + - "2,6\n" + - "3,9\n" + - "3,18\n" + - "4,5\n" + - "4,10\n" + - "5,1\n" + - "5,2"; - + "1,10\n" + + "2,3\n" + + "2,6\n" + + "3,9\n" + + "3,18\n" + + "4,5\n" + + "4,10\n" + + "5,1\n" + + "5,2"; + compareResultAsTuples(result, expectedResult); } @@ -357,23 +355,23 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithSum = - graph.groupReduceOnNeighbors(new SumInNeighborsSubtractOne(), EdgeDirection.IN); + graph.groupReduceOnNeighbors(new SumInNeighborsSubtractOne(), EdgeDirection.IN); List<Tuple2<Long, Long>> result = verticesWithSum.collect(); - + expectedResult = "1,255\n" + - "1,254\n" + - "2,12\n" + - "2,11\n" + - "3,59\n" + - "3,58\n" + - "4,102\n" + - "4,101\n" + - "5,285\n" + - "5,284"; - + "1,254\n" + + "2,12\n" + + "2,11\n" + + "3,59\n" + + "3,58\n" + + "4,102\n" + + "4,101\n" + + "5,285\n" + + "5,284"; + compareResultAsTuples(result, expectedResult); } @@ -386,35 +384,35 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); + TestGraphUtils.getLongLongEdgeData(env), env); DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = - graph.groupReduceOnNeighbors(new SumAllNeighborsAddFive(), EdgeDirection.ALL); + graph.groupReduceOnNeighbors(new SumAllNeighborsAddFive(), EdgeDirection.ALL); List<Tuple2<Long, Long>> result = verticesWithSumOfOutNeighborValues.collect(); expectedResult = "1,11\n" + - "1,16\n" + - "2,6\n" + - "2,11\n" + - "3,15\n" + - "3,20\n" + - "4,12\n" + - "4,17\n" + - "5,13\n" + - "5,18"; - + "1,16\n" + + "2,6\n" + + "2,11\n" + + "3,15\n" + + "3,20\n" + + "4,12\n" + + "4,17\n" + + "5,13\n" + + "5,18"; + compareResultAsTuples(result, expectedResult); } @SuppressWarnings("serial") - private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, - Tuple2<Long, Long>> { + private static final class SumOutNeighbors implements + NeighborsFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { @Override public void iterateNeighbors(Vertex<Long, Long> vertex, - Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors, - Collector<Tuple2<Long, Long>> out) throws Exception { - + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors, + Collector<Tuple2<Long, Long>> out) throws Exception { + long sum = 0; for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { sum += neighbor.f1.getValue(); @@ -424,14 +422,14 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - private static final class SumInNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, - Tuple2<Long, Long>> { + private static final class SumInNeighbors implements + NeighborsFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { @Override public void iterateNeighbors(Vertex<Long, Long> vertex, - Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors, - Collector<Tuple2<Long, Long>> out) throws Exception { - + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors, + Collector<Tuple2<Long, Long>> out) throws Exception { + long sum = 0; for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { sum += neighbor.f0.getValue() * neighbor.f1.getValue(); @@ -441,14 +439,14 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, - Tuple2<Long, Long>> { + private static final class SumAllNeighbors implements + NeighborsFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { @Override public void iterateNeighbors(Vertex<Long, Long> vertex, - Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors, - Collector<Tuple2<Long, Long>> out) throws Exception { - + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors, + Collector<Tuple2<Long, Long>> out) throws Exception { + long sum = 0; for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { sum += neighbor.f1.getValue(); @@ -458,57 +456,57 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - private static final class SumOutNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long, - Tuple2<Long, Long>> { + private static final class SumOutNeighborsIdGreaterThanThree implements + NeighborsFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { @Override public void iterateNeighbors(Vertex<Long, Long> vertex, - Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors, - Collector<Tuple2<Long, Long>> out) throws Exception { + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors, + Collector<Tuple2<Long, Long>> out) throws Exception { long sum = 0; for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { - sum += neighbor.f1.getValue(); + sum += neighbor.f1.getValue(); } - if(vertex.getId() > 3) { + if (vertex.getId() > 3) { out.collect(new Tuple2<>(vertex.getId(), sum)); } } } @SuppressWarnings("serial") - private static final class SumInNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long, - Tuple2<Long, Long>> { + private static final class SumInNeighborsIdGreaterThanThree implements + NeighborsFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { @Override public void iterateNeighbors(Vertex<Long, Long> vertex, - Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors, - Collector<Tuple2<Long, Long>> out) throws Exception { + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors, + Collector<Tuple2<Long, Long>> out) throws Exception { long sum = 0; for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { sum += neighbor.f0.getValue() * neighbor.f1.getValue(); } - if(vertex.getId() > 3) { + if (vertex.getId() > 3) { out.collect(new Tuple2<>(vertex.getId(), sum)); } } } @SuppressWarnings("serial") - private static final class SumAllNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long, - Tuple2<Long, Long>> { + private static final class SumAllNeighborsIdGreaterThanThree implements + NeighborsFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { @Override public void iterateNeighbors(Vertex<Long, Long> vertex, - Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors, - Collector<Tuple2<Long, Long>> out) throws Exception { + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors, + Collector<Tuple2<Long, Long>> out) throws Exception { long sum = 0; for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { sum += neighbor.f1.getValue(); } - if(vertex.getId() > 3) { + if (vertex.getId() > 3) { out.collect(new Tuple2<>(vertex.getId(), sum + vertex.getValue())); } } @@ -524,12 +522,12 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long, - Tuple2<Long, Long>> { + private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long, Tuple2<Long, Long>> { @Override public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors, - Collector<Tuple2<Long, Long>> out) throws Exception { + Collector<Tuple2<Long, Long>> out) throws Exception { + long sum = 0; Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; for (Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { @@ -541,12 +539,12 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - private static final class SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long, - Tuple2<Long, Long>> { + private static final class SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements + NeighborsFunction<Long, Long, Long, Tuple2<Long, Long>> { @Override public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors, - Collector<Tuple2<Long, Long>> out) throws Exception { + Collector<Tuple2<Long, Long>> out) throws Exception { long sum = 0; Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; @@ -554,7 +552,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { next = neighbor; sum += next.f2.getValue(); } - if(next.f0 > 2) { + if (next.f0 > 2) { out.collect(new Tuple2<>(next.f0, sum)); out.collect(new Tuple2<>(next.f0, sum * 2)); } @@ -562,12 +560,12 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - private static final class SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long, - Tuple2<Long, Long>> { + private static final class SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements + NeighborsFunction<Long, Long, Long, Tuple2<Long, Long>> { @Override public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors, - Collector<Tuple2<Long, Long>> out) throws Exception { + Collector<Tuple2<Long, Long>> out) throws Exception { long sum = 0; Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; @@ -575,7 +573,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { next = neighbor; sum += next.f2.getValue() * next.f1.getValue(); } - if(next.f0 > 2) { + if (next.f0 > 2) { out.collect(new Tuple2<>(next.f0, sum)); out.collect(new Tuple2<>(next.f0, sum * 2)); } @@ -583,12 +581,12 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - private static final class SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long, - Tuple2<Long, Long>> { + private static final class SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements + NeighborsFunction<Long, Long, Long, Tuple2<Long, Long>> { @Override public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors, - Collector<Tuple2<Long, Long>> out) throws Exception { + Collector<Tuple2<Long, Long>> out) throws Exception { long sum = 0; Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; @@ -596,7 +594,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { next = neighbor; sum += next.f2.getValue(); } - if(next.f0 > 2) { + if (next.f0 > 2) { out.collect(new Tuple2<>(next.f0, sum)); out.collect(new Tuple2<>(next.f0, sum * 2)); } @@ -604,13 +602,13 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - private static final class SumOutNeighborsMultipliedByTwo implements NeighborsFunctionWithVertexValue<Long, Long, Long, - Tuple2<Long, Long>> { + private static final class SumOutNeighborsMultipliedByTwo implements + NeighborsFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { @Override public void iterateNeighbors(Vertex<Long, Long> vertex, - Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors, - Collector<Tuple2<Long, Long>> out) throws Exception { + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors, + Collector<Tuple2<Long, Long>> out) throws Exception { long sum = 0; for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { @@ -622,13 +620,13 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - private static final class SumInNeighborsSubtractOne implements NeighborsFunctionWithVertexValue<Long, Long, Long, - Tuple2<Long, Long>> { + private static final class SumInNeighborsSubtractOne implements + NeighborsFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { @Override public void iterateNeighbors(Vertex<Long, Long> vertex, - Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors, - Collector<Tuple2<Long, Long>> out) throws Exception { + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors, + Collector<Tuple2<Long, Long>> out) throws Exception { long sum = 0; for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { @@ -640,13 +638,13 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - private static final class SumAllNeighborsAddFive implements NeighborsFunctionWithVertexValue<Long, Long, Long, - Tuple2<Long, Long>> { + private static final class SumAllNeighborsAddFive implements + NeighborsFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { @Override public void iterateNeighbors(Vertex<Long, Long> vertex, - Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors, - Collector<Tuple2<Long, Long>> out) throws Exception { + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors, + Collector<Tuple2<Long, Long>> out) throws Exception { long sum = 0; for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
