http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java new file mode 100644 index 0000000..29d76f0 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.EdgesFunction; +import org.apache.flink.graph.EdgesFunctionWithVertexValue; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { + + public ReduceOnEdgesMethodsITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testLowestWeightOutNeighbor() throws Exception { + /* + * Get the lowest-weight out-neighbor + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.reduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT); + verticesWithLowestOutNeighbor.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "2,3\n" + + "3,4\n" + + "4,5\n" + + "5,1\n"; + } + + @Test + public void testLowestWeightInNeighbor() throws Exception { + /* + * Get the lowest-weight in-neighbor + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.reduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN); + verticesWithLowestOutNeighbor.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,5\n" + + "2,1\n" + + "3,1\n" + + "4,3\n" + + "5,3\n"; + } + + @Test + public void testMaxWeightEdge() throws Exception { + /* + * Get the maximum weight among all edges + * of a vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = + graph.reduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL); + verticesWithMaxEdgeWeight.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,51\n" + + "2,23\n" + + "3,35\n" + + "4,45\n" + + "5,51\n"; + } + + @Test + public void testLowestWeightOutNeighborNoValue() throws Exception { + /* + * Get the lowest-weight out-neighbor + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.OUT); + verticesWithLowestOutNeighbor.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "2,3\n" + + "3,4\n" + + "4,5\n" + + "5,1\n"; + } + + @Test + public void testLowestWeightInNeighborNoValue() throws Exception { + /* + * Get the lowest-weight in-neighbor + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = + graph.reduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN); + verticesWithLowestOutNeighbor.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,5\n" + + "2,1\n" + + "3,1\n" + + "4,3\n" + + "5,3\n"; + } + + @Test + public void testMaxWeightAllNeighbors() throws Exception { + /* + * Get the maximum weight among all edges + * of a vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = + graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL); + verticesWithMaxEdgeWeight.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,51\n" + + "2,23\n" + + "3,35\n" + + "4,45\n" + + "5,51\n"; + } + + @SuppressWarnings("serial") + private static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateEdges( + Vertex<Long, Long> v, + Iterable<Edge<Long, Long>> edges) { + + long weight = Long.MAX_VALUE; + long minNeighorId = 0; + + for (Edge<Long, Long> edge: edges) { + if (edge.getValue() < weight) { + weight = edge.getValue(); + minNeighorId = edge.getTarget(); + } + } + return new Tuple2<Long, Long>(v.getId(), minNeighorId); + } + } + + @SuppressWarnings("serial") + private static final class SelectMaxWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v, + Iterable<Edge<Long, Long>> edges) { + + long weight = Long.MIN_VALUE; + + for (Edge<Long, Long> edge: edges) { + if (edge.getValue() > weight) { + weight = edge.getValue(); + } + } + return new Tuple2<Long, Long>(v.getId(), weight); + } + } + + @SuppressWarnings("serial") + private static final class SelectMinWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { + + long weight = Long.MAX_VALUE; + long minNeighorId = 0; + long vertexId = -1; + long i=0; + + for (Tuple2<Long, Edge<Long, Long>> edge: edges) { + if (edge.f1.getValue() < weight) { + weight = edge.f1.getValue(); + minNeighorId = edge.f1.getTarget(); + } + if (i==0) { + vertexId = edge.f0; + } i++; + } + return new Tuple2<Long, Long>(vertexId, minNeighorId); + } + } + + @SuppressWarnings("serial") + private static final class SelectMaxWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { + + long weight = Long.MIN_VALUE; + long vertexId = -1; + long i=0; + + for (Tuple2<Long, Edge<Long, Long>> edge: edges) { + if (edge.f1.getValue() > weight) { + weight = edge.f1.getValue(); + } + if (i==0) { + vertexId = edge.f0; + } i++; + } + return new Tuple2<Long, Long>(vertexId, weight); + } + } + + @SuppressWarnings("serial") + private static final class SelectMinWeightInNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateEdges( + Vertex<Long, Long> v, + Iterable<Edge<Long, Long>> edges) { + + long weight = Long.MAX_VALUE; + long minNeighorId = 0; + + for (Edge<Long, Long> edge: edges) { + if (edge.getValue() < weight) { + weight = edge.getValue(); + minNeighorId = edge.getSource(); + } + } + return new Tuple2<Long, Long>(v.getId(), minNeighorId); + } + } + + @SuppressWarnings("serial") + private static final class SelectMinWeightInNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) { + + long weight = Long.MAX_VALUE; + long minNeighorId = 0; + long vertexId = -1; + long i=0; + + for (Tuple2<Long, Edge<Long, Long>> edge: edges) { + if (edge.f1.getValue() < weight) { + weight = edge.f1.getValue(); + minNeighorId = edge.f1.getSource(); + } + if (i==0) { + vertexId = edge.f0; + } i++; + } + return new Tuple2<Long, Long>(vertexId, minNeighorId); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java new file mode 100644 index 0000000..d385399 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test; + +import java.util.Iterator; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.NeighborsFunction; +import org.apache.flink.graph.NeighborsFunctionWithVertexValue; +import org.apache.flink.graph.Vertex; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { + + public ReduceOnNeighborMethodsITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testSumOfOutNeighbors() throws Exception { + /* + * Get the sum of out-neighbor values + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,5\n" + + "2,3\n" + + "3,9\n" + + "4,5\n" + + "5,1\n"; + } + + @Test + public void testSumOfInNeighbors() throws Exception { + /* + * Get the sum of in-neighbor values + * times the edge weights for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSum = + graph.reduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN); + + verticesWithSum.writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,255\n" + + "2,12\n" + + "3,59\n" + + "4,102\n" + + "5,285\n"; + } + + @Test + public void testSumOfOAllNeighbors() throws Exception { + /* + * Get the sum of all neighbor values + * including own vertex value + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,11\n" + + "2,6\n" + + "3,15\n" + + "4,12\n" + + "5,13\n"; + } + + @Test + public void testSumOfOutNeighborsNoValue() throws Exception { + /* + * Get the sum of out-neighbor values + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumOutNeighborsNoValue(), EdgeDirection.OUT); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,5\n" + + "2,3\n" + + "3,9\n" + + "4,5\n" + + "5,1\n"; + } + + @Test + public void testSumOfInNeighborsNoValue() throws Exception { + /* + * Get the sum of in-neighbor values + * times the edge weights for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSum = + graph.reduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN); + + verticesWithSum.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,255\n" + + "2,12\n" + + "3,59\n" + + "4,102\n" + + "5,285\n"; + } + + @Test + public void testSumOfAllNeighborsNoValue() throws Exception { + /* + * Get the sum of all neighbor values + * for each vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = + graph.reduceOnNeighbors(new SumAllNeighborsNoValue(), EdgeDirection.ALL); + + verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,10\n" + + "2,4\n" + + "3,12\n" + + "4,8\n" + + "5,8\n"; + } + + @SuppressWarnings("serial") + private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, + Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + + long sum = 0; + for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { + sum += neighbor.f1.getValue(); + } + return new Tuple2<Long, Long>(vertex.getId(), sum); + } + } + + @SuppressWarnings("serial") + private static final class SumInNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, + Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + + long sum = 0; + for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { + sum += neighbor.f0.getValue() * neighbor.f1.getValue(); + } + return new Tuple2<Long, Long>(vertex.getId(), sum); + } + } + + @SuppressWarnings("serial") + private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, + Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex, + Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + + long sum = 0; + for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) { + sum += neighbor.f1.getValue(); + } + return new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()); + } + } + + @SuppressWarnings("serial") + private static final class SumOutNeighborsNoValue implements NeighborsFunction<Long, Long, Long, + Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateNeighbors( + Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + + long sum = 0; + Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; + Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator = + neighbors.iterator(); + while(neighborsIterator.hasNext()) { + next = neighborsIterator.next(); + sum += next.f2.getValue(); + } + return new Tuple2<Long, Long>(next.f0, sum); + } + } + + @SuppressWarnings("serial") + private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long, + Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateNeighbors( + Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + + long sum = 0; + Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; + Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator = + neighbors.iterator(); + while(neighborsIterator.hasNext()) { + next = neighborsIterator.next(); + sum += next.f2.getValue() * next.f1.getValue(); + } + return new Tuple2<Long, Long>(next.f0, sum); + } + } + + @SuppressWarnings("serial") + private static final class SumAllNeighborsNoValue implements NeighborsFunction<Long, Long, Long, + Tuple2<Long, Long>> { + + public Tuple2<Long, Long> iterateNeighbors( + Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) { + + long sum = 0; + Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null; + Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator = + neighbors.iterator(); + while(neighborsIterator.hasNext()) { + next = neighborsIterator.next(); + sum += next.f2.getValue(); + } + return new Tuple2<Long, Long>(next.f0, sum); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestDegrees.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestDegrees.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestDegrees.java deleted file mode 100644 index c572647..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestDegrees.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Graph; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.types.NullValue; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestDegrees extends MultipleProgramsTestBase { - - public TestDegrees(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testOutDegrees() throws Exception { - /* - * Test outDegrees() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - graph.outDegrees().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2\n" + - "2,1\n" + - "3,2\n" + - "4,1\n" + - "5,1\n"; - } - - @Test - public void testOutDegreesWithNoOutEdges() throws Exception { - /* - * Test outDegrees() no outgoing edges - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env); - - graph.outDegrees().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,3\n" + - "2,1\n" + - "3,1\n" + - "4,1\n" + - "5,0\n"; - } - - @Test - public void testInDegrees() throws Exception { - /* - * Test inDegrees() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - graph.inDegrees().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,1\n" + - "2,1\n" + - "3,2\n" + - "4,1\n" + - "5,2\n"; - } - - @Test - public void testInDegreesWithNoInEdge() throws Exception { - /* - * Test inDegrees() no ingoing edge - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env); - - graph.inDegrees().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,0\n" + - "2,1\n" + - "3,1\n" + - "4,1\n" + - "5,3\n"; - } - - @Test - public void testGetDegrees() throws Exception { - /* - * Test getDegrees() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - graph.getDegrees().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,3\n" + - "2,2\n" + - "3,4\n" + - "4,2\n" + - "5,3\n"; - } - - @Test - public void testGetDegreesWithDisconnectedData() throws Exception { - /* - * Test getDegrees() with disconnected data - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, NullValue, Long> graph = - Graph.fromDataSet(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env); - - graph.outDegrees().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2\n" + - "2,1\n" + - "3,0\n" + - "4,1\n" + - "5,0\n"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestFromCollection.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestFromCollection.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestFromCollection.java deleted file mode 100644 index e1b96e8..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestFromCollection.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Graph; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.types.NullValue; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestFromCollection extends MultipleProgramsTestBase { - - public TestFromCollection(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testFromCollectionVerticesEdges() throws Exception { - /* - * Test fromCollection(vertices, edges): - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), - TestGraphUtils.getLongLongEdges(), env); - - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testFromCollectionEdgesNoInitialValue() throws Exception { - /* - * Test fromCollection(edges) with no initial value for the vertices - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, NullValue, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(), - env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,(null)\n" + - "2,(null)\n" + - "3,(null)\n" + - "4,(null)\n" + - "5,(null)\n"; - } - - @Test - public void testFromCollectionEdgesWithInitialValue() throws Exception { - /* - * Test fromCollection(edges) with vertices initialised by a - * function that takes the id and doubles it - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(), - new InitVerticesMapper(), env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2\n" + - "2,4\n" + - "3,6\n" + - "4,8\n" + - "5,10\n"; - } - - @SuppressWarnings("serial") - private static final class InitVerticesMapper implements MapFunction<Long, Long> { - public Long map(Long vertexId) { - return vertexId * 2; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreation.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreation.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreation.java deleted file mode 100644 index d3e3bda..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreation.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; -import org.apache.flink.graph.validation.InvalidVertexIdsValidator; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.types.NullValue; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestGraphCreation extends MultipleProgramsTestBase { - - public TestGraphCreation(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testCreateWithoutVertexValues() throws Exception { - /* - * Test create() with edge dataset and no vertex values - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, NullValue, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,(null)\n" + - "2,(null)\n" + - "3,(null)\n" + - "4,(null)\n" + - "5,(null)\n"; - } - - @Test - public void testCreateWithMapper() throws Exception { - /* - * Test create() with edge dataset and a mapper that assigns the id as value - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), - new AssignIdAsValueMapper(), env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,1\n" + - "2,2\n" + - "3,3\n" + - "4,4\n" + - "5,5\n"; - } - - @Test - public void testCreateWithCustomVertexValue() throws Exception { - /* - * Test create() with edge dataset and a mapper that assigns a parametrized custom vertex value - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, DummyCustomParameterizedType<Double>, Long> graph = Graph.fromDataSet( - TestGraphUtils.getLongLongEdgeData(env), new AssignCustomVertexValueMapper(), env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,(2.0,0)\n" + - "2,(4.0,1)\n" + - "3,(6.0,2)\n" + - "4,(8.0,3)\n" + - "5,(10.0,4)\n"; - } - - @Test - public void testValidate() throws Exception { - /* - * Test validate(): - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongVertexData(env); - DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env); - DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>()); - - result.writeAsText(resultPath); - env.execute(); - - expectedResult = "true\n"; - } - - @Test - public void testValidateWithInvalidIds() throws Exception { - /* - * Test validate() - invalid vertex ids - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Vertex<Long, Long>> vertices = TestGraphUtils.getLongLongInvalidVertexData(env); - DataSet<Edge<Long, Long>> edges = TestGraphUtils.getLongLongEdgeData(env); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(vertices, edges, env); - DataSet<Boolean> result = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>()); - result.writeAsText(resultPath); - env.execute(); - - expectedResult = "false\n"; - } - - @SuppressWarnings("serial") - private static final class AssignIdAsValueMapper implements MapFunction<Long, Long> { - public Long map(Long vertexId) { - return vertexId; - } - } - - @SuppressWarnings("serial") - private static final class AssignCustomVertexValueMapper implements - MapFunction<Long, DummyCustomParameterizedType<Double>> { - - DummyCustomParameterizedType<Double> dummyValue = - new DummyCustomParameterizedType<Double>(); - - public DummyCustomParameterizedType<Double> map(Long vertexId) { - dummyValue.setIntField(vertexId.intValue()-1); - dummyValue.setTField(vertexId*2.0); - return dummyValue; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreationWithMapper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreationWithMapper.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreationWithMapper.java deleted file mode 100644 index 67ff5cc..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphCreationWithMapper.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestGraphCreationWithMapper extends MultipleProgramsTestBase { - - public TestGraphCreationWithMapper(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testWithDoubleValueMapper() throws Exception { - /* - * Test create() with edge dataset and a mapper that assigns a double constant as value - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), - new AssignDoubleValueMapper(), env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,0.1\n" + - "2,0.1\n" + - "3,0.1\n" + - "4,0.1\n" + - "5,0.1\n"; - } - - @Test - public void testWithTuple2ValueMapper() throws Exception { - /* - * Test create() with edge dataset and a mapper that assigns a Tuple2 as value - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Tuple2<Long, Long>, Long> graph = Graph.fromDataSet( - TestGraphUtils.getLongLongEdgeData(env), new AssignTuple2ValueMapper(), env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,(2,42)\n" + - "2,(4,42)\n" + - "3,(6,42)\n" + - "4,(8,42)\n" + - "5,(10,42)\n"; - } - - @Test - public void testWithConstantValueMapper() throws Exception { - /* - * Test create() with edge dataset with String key type - * and a mapper that assigns a double constant as value - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<String, Double, Long> graph = Graph.fromDataSet(TestGraphUtils.getStringLongEdgeData(env), - new AssignDoubleConstantMapper(), env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,0.1\n" + - "2,0.1\n" + - "3,0.1\n" + - "4,0.1\n" + - "5,0.1\n"; - } - - @Test - public void testWithDCustomValueMapper() throws Exception { - /* - * Test create() with edge dataset and a mapper that assigns a custom vertex value - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, DummyCustomType, Long> graph = Graph.fromDataSet( - TestGraphUtils.getLongLongEdgeData(env), new AssignCustomValueMapper(), env); - - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,(F,0)\n" + - "2,(F,1)\n" + - "3,(F,2)\n" + - "4,(F,3)\n" + - "5,(F,4)\n"; - } - - @SuppressWarnings("serial") - private static final class AssignDoubleValueMapper implements MapFunction<Long, Double> { - public Double map(Long value) { - return 0.1d; - } - } - - @SuppressWarnings("serial") - private static final class AssignTuple2ValueMapper implements MapFunction<Long, Tuple2<Long, Long>> { - public Tuple2<Long, Long> map(Long vertexId) { - return new Tuple2<Long, Long>(vertexId*2, 42l); - } - } - - @SuppressWarnings("serial") - private static final class AssignDoubleConstantMapper implements MapFunction<String, Double> { - public Double map(String value) { - return 0.1d; - } - } - - @SuppressWarnings("serial") - private static final class AssignCustomValueMapper implements MapFunction<Long, DummyCustomType> { - public DummyCustomType map(Long vertexId) { - return new DummyCustomType(vertexId.intValue()-1, false); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphMutations.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphMutations.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphMutations.java deleted file mode 100644 index f53f51e..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphMutations.java +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestGraphMutations extends MultipleProgramsTestBase { - - public TestGraphMutations(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testAddVertex() throws Exception { - /* - * Test addVertex() -- simple case - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>(); - edges.add(new Edge<Long, Long>(6L, 1L, 61L)); - graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n" + - "6,1,61\n"; - } - - @Test - public void testAddVertexExisting() throws Exception { - /* - * Test addVertex() -- add an existing vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>(); - edges.add(new Edge<Long, Long>(1L, 5L, 15L)); - graph = graph.addVertex(new Vertex<Long, Long>(1L, 1L), edges); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "1,5,15\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testAddVertexNoEdges() throws Exception { - /* - * Test addVertex() -- add vertex with empty edge set - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>(); - graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L), edges); - graph.getVertices().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,1\n" + - "2,2\n" + - "3,3\n" + - "4,4\n" + - "5,5\n" + - "6,6\n"; - } - - @Test - public void testRemoveVertex() throws Exception { - /* - * Test removeVertex() -- simple case - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.removeVertex(new Vertex<Long, Long>(5L, 5L)); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n"; - } - - @Test - public void testRemoveInvalidVertex() throws Exception { - /* - * Test removeVertex() -- remove an invalid vertex - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.removeVertex(new Vertex<Long, Long>(6L, 6L)); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testAddEdge() throws Exception { - /* - * Test addEdge() -- simple case - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.addEdge(new Vertex<Long, Long>(6L, 6L), new Vertex<Long, Long>(1L, 1L), - 61L); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n" + - "6,1,61\n"; - } - - @Test - public void testAddExistingEdge() throws Exception { - /* - * Test addEdge() -- add already existing edge - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.addEdge(new Vertex<Long, Long>(1L, 1L), new Vertex<Long, Long>(2L, 2L), - 12L); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,12\n" + - "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testRemoveVEdge() throws Exception { - /* - * Test removeEdge() -- simple case - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.removeEdge(new Edge<Long, Long>(5L, 1L, 51L)); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n"; - } - - @Test - public void testRemoveInvalidEdge() throws Exception { - /* - * Test removeEdge() -- invalid edge - */ - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph = graph.removeEdge(new Edge<Long, Long>(6L, 1L, 61L)); - graph.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java deleted file mode 100644 index 6ab6928..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphOperations.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestGraphOperations extends MultipleProgramsTestBase { - - public TestGraphOperations(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testUndirected() throws Exception { - /* - * Test getUndirected() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - graph.getUndirected().getEdges().writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2,12\n" + "2,1,12\n" + - "1,3,13\n" + "3,1,13\n" + - "2,3,23\n" + "3,2,23\n" + - "3,4,34\n" + "4,3,34\n" + - "3,5,35\n" + "5,3,35\n" + - "4,5,45\n" + "5,4,45\n" + - "5,1,51\n" + "1,5,51\n"; - } - - @Test - public void testReverse() throws Exception { - /* - * Test reverse() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - graph.reverse().getEdges().writeAsCsv(resultPath); - env.execute(); - expectedResult = "2,1,12\n" + - "3,1,13\n" + - "3,2,23\n" + - "4,3,34\n" + - "5,3,35\n" + - "5,4,45\n" + - "1,5,51\n"; - } - - @SuppressWarnings("serial") - @Test - public void testSubGraph() throws Exception { - /* - * Test subgraph: - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph.subgraph(new FilterFunction<Vertex<Long, Long>>() { - public boolean filter(Vertex<Long, Long> vertex) throws Exception { - return (vertex.getValue() > 2); - } - }, - new FilterFunction<Edge<Long, Long>>() { - public boolean filter(Edge<Long, Long> edge) throws Exception { - return (edge.getValue() > 34); - } - }).getEdges().writeAsCsv(resultPath); - - env.execute(); - expectedResult = "3,5,35\n" + - "4,5,45\n"; - } - - @SuppressWarnings("serial") - @Test - public void testFilterVertices() throws Exception { - /* - * Test filterOnVertices: - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph.filterOnVertices(new FilterFunction<Vertex<Long, Long>>() { - public boolean filter(Vertex<Long, Long> vertex) throws Exception { - return (vertex.getValue() > 2); - } - }).getEdges().writeAsCsv(resultPath); - - env.execute(); - expectedResult = "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n"; - } - - @SuppressWarnings("serial") - @Test - public void testFilterEdges() throws Exception { - /* - * Test filterOnEdges: - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph.filterOnEdges(new FilterFunction<Edge<Long, Long>>() { - public boolean filter(Edge<Long, Long> edge) throws Exception { - return (edge.getValue() > 34); - } - }).getEdges().writeAsCsv(resultPath); - - env.execute(); - expectedResult = "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testNumberOfVertices() throws Exception { - /* - * Test numberOfVertices() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph.numberOfVertices().writeAsText(resultPath); - - env.execute(); - expectedResult = "5"; - } - - @Test - public void testNumberOfEdges() throws Exception { - /* - * Test numberOfEdges() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph.numberOfEdges().writeAsText(resultPath); - - env.execute(); - expectedResult = "7"; - } - - @Test - public void testVertexIds() throws Exception { - /* - * Test getVertexIds() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph.getVertexIds().writeAsText(resultPath); - - env.execute(); - expectedResult = "1\n2\n3\n4\n5\n"; - } - - @Test - public void testEdgesIds() throws Exception { - /* - * Test getEdgeIds() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - graph.getEdgeIds().writeAsCsv(resultPath); - - env.execute(); - expectedResult = "1,2\n" + "1,3\n" + - "2,3\n" + "3,4\n" + - "3,5\n" + "4,5\n" + - "5,1\n"; - } - - @Test - public void testUnion() throws Exception { - /* - * Test union() - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>(); - List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>(); - - vertices.add(new Vertex<Long, Long>(6L, 6L)); - edges.add(new Edge<Long, Long>(6L, 1L, 61L)); - - graph = graph.union(Graph.fromCollection(vertices, edges, env)); - - graph.getEdges().writeAsCsv(resultPath); - - env.execute(); - - expectedResult = "1,2,12\n" + - "1,3,13\n" + - "2,3,23\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n" + - "6,1,61\n"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/6296c394/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java deleted file mode 100644 index 0be97be..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java +++ /dev/null @@ -1,519 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; -import org.apache.flink.graph.utils.EdgeToTuple3Map; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class TestJoinWithEdges extends MultipleProgramsTestBase { - - public TestJoinWithEdges(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testWithEdgesInputDataset() throws Exception { - /* - * Test joinWithEdges with the input DataSet parameter identical - * to the edge DataSet - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges() - .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,26\n" + - "2,3,46\n" + - "3,4,68\n" + - "3,5,70\n" + - "4,5,90\n" + - "5,1,102\n"; - } - - @Test - public void testWithLessElements() throws Exception { - /* - * Test joinWithEdges with the input DataSet passed as a parameter containing - * less elements than the edge DataSet, but of the same type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3) - .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,26\n" + - "2,3,46\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testWithLessElementsDifferentType() throws Exception { - /* - * Test joinWithEdges with the input DataSet passed as a parameter containing - * less elements than the edge DataSet and of a different type(Boolean) - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3) - .map(new BooleanEdgeValueMapper()), new DoubleIfTrueMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,26\n" + - "2,3,46\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testWithNoCommonKeys() throws Exception { - /* - * Test joinWithEdges with the input DataSet containing different keys than the edge DataSet - * - the iterator becomes empty. - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env), - new DoubleValueMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,26\n" + - "2,3,46\n" + - "3,4,68\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testWithCustomType() throws Exception { - /* - * Test joinWithEdges with a DataSet containing custom parametrised type input values - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env), - new CustomValueMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,10\n" + - "1,3,20\n" + - "2,3,30\n" + - "3,4,40\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testWithEdgesOnSource() throws Exception { - /* - * Test joinWithEdgesOnSource with the input DataSet parameter identical - * to the edge DataSet - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges() - .map(new ProjectSourceAndValueMapper()), new AddValuesMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,25\n" + - "2,3,46\n" + - "3,4,68\n" + - "3,5,69\n" + - "4,5,90\n" + - "5,1,102\n"; - } - - @Test - public void testOnSourceWithLessElements() throws Exception { - /* - * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing - * less elements than the edge DataSet, but of the same type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3) - .map(new ProjectSourceAndValueMapper()), new AddValuesMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,25\n" + - "2,3,46\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testOnSourceWithDifferentType() throws Exception { - /* - * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing - * less elements than the edge DataSet and of a different type(Boolean) - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3) - .map(new ProjectSourceWithTrueMapper()), new DoubleIfTrueMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,26\n" + - "2,3,46\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testOnSourceWithNoCommonKeys() throws Exception { - /* - * Test joinWithEdgesOnSource with the input DataSet containing different keys than the edge DataSet - * - the iterator becomes empty. - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env), - new DoubleValueMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,20\n" + - "1,3,20\n" + - "2,3,60\n" + - "3,4,80\n" + - "3,5,80\n" + - "4,5,120\n" + - "5,1,51\n"; - } - - @Test - public void testOnSourceWithCustom() throws Exception { - /* - * Test joinWithEdgesOnSource with a DataSet containing custom parametrised type input values - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env), - new CustomValueMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,10\n" + - "1,3,10\n" + - "2,3,30\n" + - "3,4,40\n" + - "3,5,40\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testWithEdgesOnTarget() throws Exception { - /* - * Test joinWithEdgesOnTarget with the input DataSet parameter identical - * to the edge DataSet - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges() - .map(new ProjectTargetAndValueMapper()), new AddValuesMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,26\n" + - "2,3,36\n" + - "3,4,68\n" + - "3,5,70\n" + - "4,5,80\n" + - "5,1,102\n"; - } - - @Test - public void testWithOnTargetWithLessElements() throws Exception { - /* - * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing - * less elements than the edge DataSet, but of the same type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3) - .map(new ProjectTargetAndValueMapper()), new AddValuesMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,26\n" + - "2,3,36\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testOnTargetWithDifferentType() throws Exception { - /* - * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing - * less elements than the edge DataSet and of a different type(Boolean) - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3) - .map(new ProjectTargetWithTrueMapper()), new DoubleIfTrueMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,24\n" + - "1,3,26\n" + - "2,3,46\n" + - "3,4,34\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @Test - public void testOnTargetWithNoCommonKeys() throws Exception { - /* - * Test joinWithEdgesOnTarget with the input DataSet containing different keys than the edge DataSet - * - the iterator becomes empty. - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env), - new DoubleValueMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,20\n" + - "1,3,40\n" + - "2,3,40\n" + - "3,4,80\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,140\n"; - } - - @Test - public void testOnTargetWithCustom() throws Exception { - /* - * Test joinWithEdgesOnTarget with a DataSet containing custom parametrised type input values - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env), - new CustomValueMapper()); - - result.getEdges().writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2,10\n" + - "1,3,20\n" + - "2,3,20\n" + - "3,4,40\n" + - "3,5,35\n" + - "4,5,45\n" + - "5,1,51\n"; - } - - @SuppressWarnings("serial") - private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> { - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f0 + tuple.f1; - } - } - - @SuppressWarnings("serial") - private static final class BooleanEdgeValueMapper implements MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Boolean>> { - public Tuple3<Long, Long, Boolean> map(Edge<Long, Long> edge) throws Exception { - return new Tuple3<Long, Long, Boolean>(edge.getSource(), - edge.getTarget(), true); - } - } - - @SuppressWarnings("serial") - private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> { - public Long map(Tuple2<Long, Boolean> tuple) throws Exception { - if(tuple.f1) { - return tuple.f0 * 2; - } - else { - return tuple.f0; - } - } - } - - @SuppressWarnings("serial") - private static final class DoubleValueMapper implements MapFunction<Tuple2<Long, Long>, Long> { - public Long map(Tuple2<Long, Long> tuple) throws Exception { - return tuple.f1 * 2; - } - } - - @SuppressWarnings("serial") - private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> { - public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception { - return (long) tuple.f1.getIntField(); - } - } - - @SuppressWarnings("serial") - private static final class ProjectSourceAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> { - public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception { - return new Tuple2<Long, Long>(edge.getSource(), edge.getValue()); - } - } - - @SuppressWarnings("serial") - private static final class ProjectSourceWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> { - public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception { - return new Tuple2<Long, Boolean>(edge.getSource(), true); - } - } - - @SuppressWarnings("serial") - private static final class ProjectTargetAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> { - public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception { - return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue()); - } - } - - @SuppressWarnings("serial") - private static final class ProjectTargetWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> { - public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception { - return new Tuple2<Long, Boolean>(edge.getTarget(), true); - } - } -} \ No newline at end of file