http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java index 866f334..12047e7 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java @@ -41,7 +41,7 @@ import org.apache.flink.types.NullValue; * * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID. * - * @see org.apache.flink.graph.library.GSAConnectedComponents + * @see GSAConnectedComponents */ @SuppressWarnings("serial") public class ConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> {
http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java index a44ba14..0354da4 100755 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java @@ -34,7 +34,7 @@ import org.apache.flink.types.NullValue; * This implementation assumes that the vertices of the input Graph are initialized with unique, Long component IDs. * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID. * - * @see org.apache.flink.graph.library.ConnectedComponents + * @see ConnectedComponents */ public class GSAConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> { http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java index 0c5080d..29183e9 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java @@ -116,4 +116,4 @@ public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, D } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java index c6bba4c..3842e6c 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -353,4 +353,4 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements this.setField(vertex3, V3); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java new file mode 100644 index 0000000..2f619a6 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.test.testdata.ConnectedComponentsData; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.types.NullValue; + +import java.io.BufferedReader; + +@SuppressWarnings("serial") +public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTestBase { + + private static final long SEED = 9487520347802987L; + + private static final int NUM_VERTICES = 1000; + + private static final int NUM_EDGES = 10000; + + private String resultPath; + + @Override + protected void preSubmit() throws Exception { + resultPath = getTempFilePath("results"); + } + + @Override + protected void testProgram() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES); + DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n")); + + DataSet<Edge<Long, NullValue>> edges = edgeString.map(new EdgeParser()); + + DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner()); + + Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env); + + DataSet<Vertex<Long, Long>> result = graph.run(new ConnectedComponents<Long, NullValue>(100)); + + result.writeAsCsv(resultPath, "\n", " "); + env.execute(); + } + + /** + * A map function that takes a Long value and creates a 2-tuple out of it: + * <pre>(Long value) -> (value, value)</pre> + */ + public static final class IdAssigner implements MapFunction<Long, Vertex<Long, Long>> { + @Override + public Vertex<Long, Long> map(Long value) { + return new Vertex<>(value, value); + } + } + + @Override + protected void postSubmit() throws Exception { + for (BufferedReader reader : getResultReader(resultPath)) { + ConnectedComponentsData.checkOddEvenResult(reader); + } + } + + public static final class EdgeParser extends RichMapFunction<String, Edge<Long, NullValue>> { + public Edge<Long, NullValue> map(String value) { + String[] nums = value.split(" "); + return new Edge<>(Long.parseLong(nums[0]), Long.parseLong(nums[1]), + NullValue.getInstance()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java deleted file mode 100755 index 039a05c..0000000 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData; -import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData; -import org.apache.flink.graph.library.GSAConnectedComponents; -import org.apache.flink.graph.library.GSASingleSourceShortestPaths; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.types.NullValue; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class GatherSumApplyITCase extends MultipleProgramsTestBase { - - public GatherSumApplyITCase(TestExecutionMode mode){ - super(mode); - } - - private String expectedResult; - - // -------------------------------------------------------------------------------------------- - // Connected Components Test - // -------------------------------------------------------------------------------------------- - - @Test - public void testConnectedComponents() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet( - ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env), - new InitMapperCC(), env); - - List<Vertex<Long, Long>> result = inputGraph.run( - new GSAConnectedComponents<Long, NullValue>(16)).collect(); - - expectedResult = "1,1\n" + - "2,1\n" + - "3,1\n" + - "4,1\n"; - - compareResultAsTuples(result, expectedResult); - } - - // -------------------------------------------------------------------------------------------- - // Single Source Shortest Path Test - // -------------------------------------------------------------------------------------------- - - @Test - public void testSingleSourceShortestPaths() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Double, Double> inputGraph = Graph.fromDataSet( - SingleSourceShortestPathsData.getDefaultEdgeDataSet(env), - new InitMapperSSSP(), env); - - List<Vertex<Long, Double>> result = inputGraph.run( - new GSASingleSourceShortestPaths<>(1L, 16)).collect(); - - expectedResult = "1,0.0\n" + - "2,12.0\n" + - "3,13.0\n" + - "4,47.0\n" + - "5,48.0\n"; - - compareResultAsTuples(result, expectedResult); - } - - @SuppressWarnings("serial") - private static final class InitMapperCC implements MapFunction<Long, Long> { - public Long map(Long value) { - return value; - } - } - - @SuppressWarnings("serial") - private static final class InitMapperSSSP implements MapFunction<Long, Double> { - public Double map(Long value) { - return 0.0; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java deleted file mode 100644 index b0bacc4..0000000 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test.example; - -import com.google.common.base.Charsets; -import com.google.common.io.Files; -import org.apache.flink.graph.example.ConnectedComponents; -import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.File; - -@RunWith(Parameterized.class) -public class ConnectedComponentsITCase extends MultipleProgramsTestBase { - - private String edgesPath; - - private String resultPath; - - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - public ConnectedComponentsITCase(TestExecutionMode mode) { - super(mode); - } - - @Before - public void before() throws Exception { - resultPath = tempFolder.newFile().toURI().toString(); - - File edgesFile = tempFolder.newFile(); - Files.write(ConnectedComponentsDefaultData.EDGES, edgesFile, Charsets.UTF_8); - edgesPath = edgesFile.toURI().toString(); - } - - @Test - public void testConnectedComponentsExample() throws Exception { - ConnectedComponents.main(new String[]{edgesPath, resultPath, ConnectedComponentsDefaultData.MAX_ITERATIONS + ""}); - expected = ConnectedComponentsDefaultData.VERTICES_WITH_MIN_ID; - } - - @After - public void after() throws Exception { - compareResultsByLinesInMemory(expected, resultPath); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java deleted file mode 100644 index 183c429..0000000 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test.example; - -import com.google.common.base.Charsets; -import com.google.common.io.Files; -import org.apache.flink.graph.example.EuclideanGraphWeighing; -import org.apache.flink.graph.example.utils.EuclideanGraphData; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.File; - -@RunWith(Parameterized.class) -public class EuclideanGraphWeighingITCase extends MultipleProgramsTestBase { - - private String verticesPath; - - private String edgesPath; - - private String resultPath; - - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - public EuclideanGraphWeighingITCase(TestExecutionMode mode) { - super(mode); - } - - @Before - public void before() throws Exception { - resultPath = tempFolder.newFile().toURI().toString(); - File verticesFile = tempFolder.newFile(); - Files.write(EuclideanGraphData.VERTICES, verticesFile, Charsets.UTF_8); - - File edgesFile = tempFolder.newFile(); - Files.write(EuclideanGraphData.EDGES, edgesFile, Charsets.UTF_8); - - verticesPath = verticesFile.toURI().toString(); - edgesPath = edgesFile.toURI().toString(); - } - - @Test - public void testGraphWeightingWeighing() throws Exception { - EuclideanGraphWeighing.main(new String[]{verticesPath, edgesPath, resultPath}); - expected = EuclideanGraphData.RESULTED_WEIGHTED_EDGES; - } - - @After - public void after() throws Exception { - compareResultsByLinesInMemory(expected, resultPath); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java deleted file mode 100644 index b4cdfd5..0000000 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test.example; - -import com.google.common.base.Charsets; -import com.google.common.io.Files; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.EdgeDirection; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.IncrementalSSSP; -import org.apache.flink.graph.example.utils.IncrementalSSSPData; -import org.apache.flink.graph.spargel.ScatterGatherConfiguration; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.File; - -@RunWith(Parameterized.class) -public class IncrementalSSSPITCase extends MultipleProgramsTestBase { - - private String verticesPath; - - private String edgesPath; - - private String edgesInSSSPPath; - - private String resultPath; - - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - public IncrementalSSSPITCase(TestExecutionMode mode) { - super(mode); - } - - @Before - public void before() throws Exception { - resultPath = tempFolder.newFile().toURI().toString(); - File verticesFile = tempFolder.newFile(); - Files.write(IncrementalSSSPData.VERTICES, verticesFile, Charsets.UTF_8); - - File edgesFile = tempFolder.newFile(); - Files.write(IncrementalSSSPData.EDGES, edgesFile, Charsets.UTF_8); - - File edgesInSSSPFile = tempFolder.newFile(); - Files.write(IncrementalSSSPData.EDGES_IN_SSSP, edgesInSSSPFile, Charsets.UTF_8); - - verticesPath = verticesFile.toURI().toString(); - edgesPath = edgesFile.toURI().toString(); - edgesInSSSPPath = edgesInSSSPFile.toURI().toString(); - } - - @Test - public void testIncrementalSSSP() throws Exception { - IncrementalSSSP.main(new String[]{verticesPath, edgesPath, edgesInSSSPPath, - IncrementalSSSPData.SRC_EDGE_TO_BE_REMOVED, IncrementalSSSPData.TRG_EDGE_TO_BE_REMOVED, - IncrementalSSSPData.VAL_EDGE_TO_BE_REMOVED,resultPath, IncrementalSSSPData.NUM_VERTICES + ""}); - expected = IncrementalSSSPData.RESULTED_VERTICES; - } - - @Test - public void testIncrementalSSSPNonSPEdge() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Vertex<Long, Double>> vertices = IncrementalSSSPData.getDefaultVertexDataSet(env); - DataSet<Edge<Long, Double>> edges = IncrementalSSSPData.getDefaultEdgeDataSet(env); - DataSet<Edge<Long, Double>> edgesInSSSP = IncrementalSSSPData.getDefaultEdgesInSSSP(env); - // the edge to be removed is a non-SP edge - Edge<Long, Double> edgeToBeRemoved = new Edge<>(3L, 5L, 5.0); - - Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env); - // Assumption: all minimum weight paths are kept - Graph<Long, Double, Double> ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env); - // remove the edge - graph.removeEdge(edgeToBeRemoved); - - // configure the iteration - ScatterGatherConfiguration parameters = new ScatterGatherConfiguration(); - - if(IncrementalSSSP.isInSSSP(edgeToBeRemoved, edgesInSSSP)) { - - parameters.setDirection(EdgeDirection.IN); - parameters.setOptDegrees(true); - - // run the scatter gather iteration to propagate info - Graph<Long, Double, Double> result = ssspGraph.runScatterGatherIteration( - new IncrementalSSSP.VertexDistanceUpdater(), - new IncrementalSSSP.InvalidateMessenger(edgeToBeRemoved), - IncrementalSSSPData.NUM_VERTICES, parameters); - - DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices(); - - resultedVertices.writeAsCsv(resultPath, "\n", ","); - env.execute(); - } else { - vertices.writeAsCsv(resultPath, "\n", ","); - env.execute(); - } - - expected = IncrementalSSSPData.VERTICES; - } - - @After - public void after() throws Exception { - compareResultsByLinesInMemory(expected, resultPath); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java deleted file mode 100644 index 294a756..0000000 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test.example; - -import com.google.common.base.Charsets; -import com.google.common.io.Files; -import org.apache.flink.graph.example.JaccardSimilarityMeasure; -import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.File; - -@RunWith(Parameterized.class) -public class JaccardSimilarityMeasureITCase extends MultipleProgramsTestBase { - - private String edgesPath; - - private String resultPath; - - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - public JaccardSimilarityMeasureITCase(TestExecutionMode mode) { - super(mode); - } - - @Before - public void before() throws Exception { - resultPath = tempFolder.newFile().toURI().toString(); - - File edgesFile = tempFolder.newFile(); - Files.write(JaccardSimilarityMeasureData.EDGES, edgesFile, Charsets.UTF_8); - - edgesPath = edgesFile.toURI().toString(); - } - - @Test - public void testJaccardSimilarityMeasureExample() throws Exception { - JaccardSimilarityMeasure.main(new String[]{edgesPath, resultPath}); - expected = JaccardSimilarityMeasureData.JACCARD_EDGES; - } - - @After - public void after() throws Exception { - compareResultsByLinesInMemory(expected, resultPath); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java deleted file mode 100644 index 8152885..0000000 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test.example; - -import com.google.common.base.Charsets; -import com.google.common.io.Files; - -import org.apache.flink.graph.example.MusicProfiles; -import org.apache.flink.graph.example.utils.MusicProfilesData; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.File; -import java.util.ArrayList; -import java.util.Arrays; - -@RunWith(Parameterized.class) -public class MusicProfilesITCase extends MultipleProgramsTestBase { - - private String tripletsPath; - - private String mismatchesPath; - - private String topSongsResultPath; - - private String communitiesResultPath; - - private String expectedTopSongs; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - public MusicProfilesITCase(TestExecutionMode mode) { - super(mode); - } - - @Before - public void before() throws Exception { - topSongsResultPath = tempFolder.newFile().toURI().toString(); - communitiesResultPath = tempFolder.newFile().toURI().toString(); - - File tripletsFile = tempFolder.newFile(); - Files.write(MusicProfilesData.USER_SONG_TRIPLETS, tripletsFile, Charsets.UTF_8); - tripletsPath = tripletsFile.toURI().toString(); - - File mismatchesFile = tempFolder.newFile(); - Files.write(MusicProfilesData.MISMATCHES, mismatchesFile, Charsets.UTF_8); - mismatchesPath = mismatchesFile.toURI().toString(); - } - - @Test - public void testMusicProfilesExample() throws Exception { - MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, "0", communitiesResultPath, - MusicProfilesData.MAX_ITERATIONS + ""}); - expectedTopSongs = MusicProfilesData.TOP_SONGS_RESULT; - } - - @After - public void after() throws Exception { - compareResultsByLinesInMemory(expectedTopSongs, topSongsResultPath); - - ArrayList<String> list = new ArrayList<>(); - readAllResultLines(list, communitiesResultPath, new String[]{}, false); - - String[] result = list.toArray(new String[list.size()]); - Arrays.sort(result); - - // check that user_1 and user_2 are in the same community - Assert.assertEquals("users 1 and 2 are not in the same community", - result[0].substring(7), result[1].substring(7)); - - // check that user_3, user_4 and user_5 are in the same community - Assert.assertEquals("users 3 and 4 are not in the same community", - result[2].substring(7), result[3].substring(7)); - Assert.assertEquals("users 4 and 5 are not in the same community", - result[3].substring(7), result[4].substring(7)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java deleted file mode 100644 index d8f8c8f..0000000 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test.example; - -import com.google.common.base.Charsets; -import com.google.common.io.Files; - -import org.apache.flink.graph.example.GSASingleSourceShortestPaths; -import org.apache.flink.graph.example.SingleSourceShortestPaths; -import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.File; - -@RunWith(Parameterized.class) -public class SingleSourceShortestPathsITCase extends MultipleProgramsTestBase { - - private String edgesPath; - - private String resultPath; - - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - public SingleSourceShortestPathsITCase(TestExecutionMode mode) { - super(mode); - } - - @Before - public void before() throws Exception { - resultPath = tempFolder.newFile().toURI().toString(); - - File edgesFile = tempFolder.newFile(); - Files.write(SingleSourceShortestPathsData.EDGES, edgesFile, Charsets.UTF_8); - edgesPath = edgesFile.toURI().toString(); - } - - @Test - public void testSSSPExample() throws Exception { - SingleSourceShortestPaths.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "", - edgesPath, resultPath, 10 + ""}); - expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS; - } - - @Test - public void testGSASSSPExample() throws Exception { - GSASingleSourceShortestPaths.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "", - edgesPath, resultPath, 10 + ""}); - expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS; - } - - @After - public void after() throws Exception { - compareResultsByLinesInMemory(expected, resultPath); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java deleted file mode 100644 index 421eaa9..0000000 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test.library; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.utils.CommunityDetectionData; -import org.apache.flink.graph.library.CommunityDetection; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class CommunityDetectionITCase extends MultipleProgramsTestBase { - - public CommunityDetectionITCase(TestExecutionMode mode) { - super(mode); - } - - private String expected; - - @Test - public void testSingleIteration() throws Exception { - /* - * Test one iteration of the Simple Community Detection Example - */ - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Double> inputGraph = Graph.fromDataSet( - CommunityDetectionData.getSimpleEdgeDataSet(env), new InitLabels(), env); - - List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection<Long>(1, CommunityDetectionData.DELTA)) - .getVertices().collect(); - - expected = CommunityDetectionData.COMMUNITIES_SINGLE_ITERATION; - compareResultAsTuples(result, expected); - } - - @Test - public void testTieBreaker() throws Exception { - /* - * Test one iteration of the Simple Community Detection Example where a tie must be broken - */ - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, Long, Double> inputGraph = Graph.fromDataSet( - CommunityDetectionData.getTieEdgeDataSet(env), new InitLabels(), env); - - List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection<Long>(1, CommunityDetectionData.DELTA)) - .getVertices().collect(); - expected = CommunityDetectionData.COMMUNITIES_WITH_TIE; - compareResultAsTuples(result, expected); - } - - @SuppressWarnings("serial") - private static final class InitLabels implements MapFunction<Long, Long>{ - - public Long map(Long id) { - return id; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java deleted file mode 100644 index c8d85f0..0000000 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test.library; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.library.ConnectedComponents; -import org.apache.flink.test.testdata.ConnectedComponentsData; -import org.apache.flink.test.util.JavaProgramTestBase; -import org.apache.flink.types.NullValue; - -import java.io.BufferedReader; - -@SuppressWarnings("serial") -public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTestBase { - - private static final long SEED = 9487520347802987L; - - private static final int NUM_VERTICES = 1000; - - private static final int NUM_EDGES = 10000; - - private String resultPath; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempFilePath("results"); - } - - @Override - protected void testProgram() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES); - DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n")); - - DataSet<Edge<Long, NullValue>> edges = edgeString.map(new EdgeParser()); - - DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner()); - - Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env); - - DataSet<Vertex<Long, Long>> result = graph.run(new ConnectedComponents<Long, NullValue>(100)); - - result.writeAsCsv(resultPath, "\n", " "); - env.execute(); - } - - /** - * A map function that takes a Long value and creates a 2-tuple out of it: - * <pre>(Long value) -> (value, value)</pre> - */ - public static final class IdAssigner implements MapFunction<Long, Vertex<Long, Long>> { - @Override - public Vertex<Long, Long> map(Long value) { - return new Vertex<>(value, value); - } - } - - @Override - protected void postSubmit() throws Exception { - for (BufferedReader reader : getResultReader(resultPath)) { - ConnectedComponentsData.checkOddEvenResult(reader); - } - } - - public static final class EdgeParser extends RichMapFunction<String, Edge<Long, NullValue>> { - public Edge<Long, NullValue> map(String value) { - String[] nums = value.split(" "); - return new Edge<>(Long.parseLong(nums[0]), Long.parseLong(nums[1]), - NullValue.getInstance()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java deleted file mode 100644 index 520269b..0000000 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test.library; - -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.utils.LabelPropagationData; -import org.apache.flink.graph.library.LabelPropagation; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.types.NullValue; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class LabelPropagationITCase extends MultipleProgramsTestBase { - - public LabelPropagationITCase(TestExecutionMode mode){ - super(mode); - } - - private String expectedResult; - - @Test - public void testSingleIteration() throws Exception { - /* - * Test one iteration of label propagation example with a simple graph - */ - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet( - LabelPropagationData.getDefaultVertexSet(env), - LabelPropagationData.getDefaultEdgeDataSet(env), env); - - List<Vertex<Long, Long>> result = inputGraph - .run(new LabelPropagation<Long, Long, NullValue>(1)) - .collect(); - - expectedResult = LabelPropagationData.LABELS_AFTER_1_ITERATION; - compareResultAsTuples(result, expectedResult); - } - - @Test - public void testTieBreaker() throws Exception { - /* - * Test the label propagation example where a tie must be broken - */ - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet( - LabelPropagationData.getTieVertexSet(env), - LabelPropagationData.getTieEdgeDataSet(env), env); - - List<Vertex<Long, Long>> result = inputGraph - .run(new LabelPropagation<Long, Long, NullValue>(1)) - .collect(); - - expectedResult = LabelPropagationData.LABELS_WITH_TIE; - compareResultAsTuples(result, expectedResult); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java deleted file mode 100644 index 431ab70..0000000 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test.library; - -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.utils.PageRankData; -import org.apache.flink.graph.library.GSAPageRank; -import org.apache.flink.graph.library.PageRank; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class PageRankITCase extends MultipleProgramsTestBase { - - public PageRankITCase(TestExecutionMode mode){ - super(mode); - } - - @Test - public void testPageRankWithThreeIterations() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Double, Double> inputGraph = Graph.fromDataSet( - PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); - - List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3)) - .collect(); - - compareWithDelta(result, 0.01); - } - - @Test - public void testGSAPageRankWithThreeIterations() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Double, Double> inputGraph = Graph.fromDataSet( - PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); - - List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3)) - .collect(); - - compareWithDelta(result, 0.01); - } - - @Test - public void testPageRankWithThreeIterationsAndNumOfVertices() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Double, Double> inputGraph = Graph.fromDataSet( - PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); - - List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 5, 3)) - .collect(); - - compareWithDelta(result, 0.01); - } - - @Test - public void testGSAPageRankWithThreeIterationsAndNumOfVertices() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, Double, Double> inputGraph = Graph.fromDataSet( - PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env); - - List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 5, 3)) - .collect(); - - compareWithDelta(result, 0.01); - } - - private void compareWithDelta(List<Vertex<Long, Double>> result, - double delta) { - - String resultString = ""; - for (Vertex<Long, Double> v : result) { - resultString += v.f0.toString() + "," + v.f1.toString() +"\n"; - } - - String expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS; - String[] expected = expectedResult.isEmpty() ? new String[0] : expectedResult.split("\n"); - - String[] resultArray = resultString.isEmpty() ? new String[0] : resultString.split("\n"); - - Arrays.sort(expected); - Arrays.sort(resultArray); - - for (int i = 0; i < expected.length; i++) { - String[] expectedFields = expected[i].split(","); - String[] resultFields = resultArray[i].split(","); - - double expectedPayLoad = Double.parseDouble(expectedFields[1]); - double resultPayLoad = Double.parseDouble(resultFields[1]); - - Assert.assertTrue("Values differ by more than the permissible delta", - Math.abs(expectedPayLoad - resultPayLoad) < delta); - } - } - - @SuppressWarnings("serial") - private static final class InitMapper implements MapFunction<Long, Double> { - public Double map(Long value) { - return 1.0; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java deleted file mode 100644 index abb4511..0000000 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test.library; - -import com.google.common.collect.Lists; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.LocalCollectionOutputFormat; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.utils.SummarizationData; -import org.apache.flink.graph.library.Summarization; -import org.apache.flink.graph.library.Summarization.EdgeValue; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.types.NullValue; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.regex.Pattern; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -@RunWith(Parameterized.class) -public class SummarizationITCase extends MultipleProgramsTestBase { - - private static final Pattern TOKEN_SEPARATOR = Pattern.compile(";"); - - private static final Pattern ID_SEPARATOR = Pattern.compile(","); - - public SummarizationITCase(TestExecutionMode mode) { - super(mode); - } - - @Test - public void testWithVertexAndEdgeValues() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, String, String> input = Graph.fromDataSet( - SummarizationData.getVertices(env), - SummarizationData.getEdges(env), - env - ); - - List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices = Lists.newArrayList(); - List<Edge<Long, EdgeValue<String>>> summarizedEdges = Lists.newArrayList(); - - Graph<Long, Summarization.VertexValue<String>, EdgeValue<String>> output = - input.run(new Summarization<Long, String, String>()); - - output.getVertices().output(new LocalCollectionOutputFormat<>(summarizedVertices)); - output.getEdges().output(new LocalCollectionOutputFormat<>(summarizedEdges)); - - env.execute(); - - validateVertices(SummarizationData.EXPECTED_VERTICES, summarizedVertices); - validateEdges(SummarizationData.EXPECTED_EDGES_WITH_VALUES, summarizedEdges); - } - - @Test - public void testWithVertexAndAbsentEdgeValues() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph<Long, String, NullValue> input = Graph.fromDataSet( - SummarizationData.getVertices(env), - SummarizationData.getEdgesWithAbsentValues(env), - env - ); - - List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices = Lists.newArrayList(); - List<Edge<Long, EdgeValue<NullValue>>> summarizedEdges = Lists.newArrayList(); - - Graph<Long, Summarization.VertexValue<String>, EdgeValue<NullValue>> output = - input.run(new Summarization<Long, String, NullValue>()); - - output.getVertices().output(new LocalCollectionOutputFormat<>(summarizedVertices)); - output.getEdges().output(new LocalCollectionOutputFormat<>(summarizedEdges)); - - env.execute(); - - validateVertices(SummarizationData.EXPECTED_VERTICES, summarizedVertices); - validateEdges(SummarizationData.EXPECTED_EDGES_ABSENT_VALUES, summarizedEdges); - } - - private void validateVertices(String[] expectedVertices, - List<Vertex<Long, Summarization.VertexValue<String>>> actualVertices) { - Arrays.sort(expectedVertices); - Collections.sort(actualVertices, new Comparator<Vertex<Long, Summarization.VertexValue<String>>>() { - @Override - public int compare(Vertex<Long, Summarization.VertexValue<String>> o1, - Vertex<Long, Summarization.VertexValue<String>> o2) { - int result = o1.getId().compareTo(o2.getId()); - if (result == 0) { - result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue()); - } - if (result == 0) { - result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue()); - } - if (result == 0) { - result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue()); - } - return result; - } - }); - - for (int i = 0; i < expectedVertices.length; i++) { - validateVertex(expectedVertices[i], actualVertices.get(i)); - } - } - - private <EV extends Comparable<EV>> void validateEdges(String[] expectedEdges, - List<Edge<Long, EdgeValue<EV>>> actualEdges) { - Arrays.sort(expectedEdges); - Collections.sort(actualEdges, new Comparator<Edge<Long, EdgeValue<EV>>> () { - - @Override - public int compare(Edge<Long, EdgeValue<EV>> o1, Edge<Long, EdgeValue<EV>> o2) { - int result = o1.getSource().compareTo(o2.getSource()); - if (result == 0) { - result = o1.getTarget().compareTo(o2.getTarget()); - } - if (result == 0) { - result = o1.getTarget().compareTo(o2.getTarget()); - } - if (result == 0) { - result = o1.getValue().getEdgeGroupValue().compareTo(o2.getValue().getEdgeGroupValue()); - } - if (result == 0) { - result = o1.getValue().getEdgeGroupCount().compareTo(o2.getValue().getEdgeGroupCount()); - } - return result; - } - }); - - for (int i = 0; i < expectedEdges.length; i++) { - validateEdge(expectedEdges[i], actualEdges.get(i)); - } - } - - private void validateVertex(String expected, Vertex<Long, Summarization.VertexValue<String>> actual) { - String[] tokens = TOKEN_SEPARATOR.split(expected); - assertTrue(getListFromIdRange(tokens[0]).contains(actual.getId())); - assertEquals(getGroupValue(tokens[1]), actual.getValue().getVertexGroupValue()); - assertEquals(getGroupCount(tokens[1]), actual.getValue().getVertexGroupCount()); - } - - private <EV> void validateEdge(String expected, Edge<Long, EdgeValue<EV>> actual) { - String[] tokens = TOKEN_SEPARATOR.split(expected); - assertTrue(getListFromIdRange(tokens[0]).contains(actual.getSource())); - assertTrue(getListFromIdRange(tokens[1]).contains(actual.getTarget())); - assertEquals(getGroupValue(tokens[2]), actual.getValue().getEdgeGroupValue().toString()); - assertEquals(getGroupCount(tokens[2]), actual.getValue().getEdgeGroupCount()); - } - - private List<Long> getListFromIdRange(String idRange) { - List<Long> result = Lists.newArrayList(); - for (String id : ID_SEPARATOR.split(idRange)) { - result.add(Long.parseLong(id)); - } - return result; - } - - private String getGroupValue(String token) { - return ID_SEPARATOR.split(token)[0]; - } - - private Long getGroupCount(String token) { - return Long.valueOf(ID_SEPARATOR.split(token)[1]); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java deleted file mode 100644 index 15f59fe..0000000 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test.library; - -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.example.utils.TriangleCountData; -import org.apache.flink.graph.library.GSATriangleCount; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.types.NullValue; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class TriangleCountITCase extends MultipleProgramsTestBase { - - public TriangleCountITCase(TestExecutionMode mode) { - super(mode); - } - - @Test - public void testGSATriangleCount() throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env), - env).getUndirected(); - - List<Integer> numberOfTriangles = graph.run(new GSATriangleCount<Long, NullValue, NullValue>()).collect(); - String expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES; - - Assert.assertEquals(numberOfTriangles.get(0).intValue(), Integer.parseInt(expectedResult)); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java deleted file mode 100644 index d06ba30..0000000 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test.library; - -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.example.utils.TriangleCountData; -import org.apache.flink.graph.library.TriangleEnumerator; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.types.NullValue; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.util.List; - -@RunWith(Parameterized.class) -public class TriangleEnumeratorITCase extends MultipleProgramsTestBase { - - public TriangleEnumeratorITCase(TestExecutionMode mode) { - super(mode); - } - - @Test - public void testTriangleEnumerator() throws Exception { - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env), - env); - - List<Tuple3<Long,Long,Long>> actualOutput = graph.run(new TriangleEnumerator<Long, NullValue, NullValue>()).collect(); - List<Tuple3<Long,Long,Long>> expectedResult = TriangleCountData.getListOfTriangles(); - - Assert.assertEquals(actualOutput.size(), expectedResult.size()); - for(Tuple3<Long,Long,Long> resultTriangle:actualOutput) { - Assert.assertTrue(expectedResult.indexOf(resultTriangle)>=0); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/pom.xml b/flink-libraries/pom.xml index 34153c9..64ad7e4 100644 --- a/flink-libraries/pom.xml +++ b/flink-libraries/pom.xml @@ -36,6 +36,7 @@ under the License. <modules> <module>flink-gelly</module> <module>flink-gelly-scala</module> + <module>flink-gelly-examples</module> <module>flink-python</module> <module>flink-table</module> <module>flink-ml</module>
