http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java new file mode 100644 index 0000000..aaada8f --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.examples.data.TriangleCountData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.NullValue; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +@RunWith(Parameterized.class) +public class TriangleCountITCase extends MultipleProgramsTestBase { + + public TriangleCountITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testGSATriangleCount() throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env), + env).getUndirected(); + + List<Integer> numberOfTriangles = graph.run(new GSATriangleCount<Long, NullValue, NullValue>()).collect(); + String expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES; + + Assert.assertEquals(numberOfTriangles.get(0).intValue(), Integer.parseInt(expectedResult)); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java new file mode 100644 index 0000000..56b3289 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.examples.data.TriangleCountData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.NullValue; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +@RunWith(Parameterized.class) +public class TriangleEnumeratorITCase extends MultipleProgramsTestBase { + + public TriangleEnumeratorITCase(TestExecutionMode mode) { + super(mode); + } + + @Test + public void testTriangleEnumerator() throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env), + env); + + List<Tuple3<Long,Long,Long>> actualOutput = graph.run(new TriangleEnumerator<Long, NullValue, NullValue>()).collect(); + List<Tuple3<Long,Long,Long>> expectedResult = TriangleCountData.getListOfTriangles(); + + Assert.assertEquals(actualOutput.size(), expectedResult.size()); + for(Tuple3<Long,Long,Long> resultTriangle:actualOutput) { + Assert.assertTrue(expectedResult.indexOf(resultTriangle)>=0); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java new file mode 100755 index 0000000..7a3d550 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData; +import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData; +import org.apache.flink.graph.library.GSAConnectedComponents; +import org.apache.flink.graph.library.GSASingleSourceShortestPaths; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.NullValue; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; + +@RunWith(Parameterized.class) +public class GatherSumApplyITCase extends MultipleProgramsTestBase { + + public GatherSumApplyITCase(TestExecutionMode mode){ + super(mode); + } + + private String expectedResult; + + // -------------------------------------------------------------------------------------------- + // Connected Components Test + // -------------------------------------------------------------------------------------------- + + @Test + public void testConnectedComponents() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet( + ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env), + new InitMapperCC(), env); + + List<Vertex<Long, Long>> result = inputGraph.run( + new GSAConnectedComponents<Long, NullValue>(16)).collect(); + + expectedResult = "1,1\n" + + "2,1\n" + + "3,1\n" + + "4,1\n"; + + compareResultAsTuples(result, expectedResult); + } + + // -------------------------------------------------------------------------------------------- + // Single Source Shortest Path Test + // -------------------------------------------------------------------------------------------- + + @Test + public void testSingleSourceShortestPaths() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph<Long, Double, Double> inputGraph = Graph.fromDataSet( + SingleSourceShortestPathsData.getDefaultEdgeDataSet(env), + new InitMapperSSSP(), env); + + List<Vertex<Long, Double>> result = inputGraph.run( + new GSASingleSourceShortestPaths<>(1L, 16)).collect(); + + expectedResult = "1,0.0\n" + + "2,12.0\n" + + "3,13.0\n" + + "4,47.0\n" + + "5,48.0\n"; + + compareResultAsTuples(result, expectedResult); + } + + @SuppressWarnings("serial") + private static final class InitMapperCC implements MapFunction<Long, Long> { + public Long map(Long value) { + return value; + } + } + + @SuppressWarnings("serial") + private static final class InitMapperSSSP implements MapFunction<Long, Double> { + public Double map(Long value) { + return 0.0; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java new file mode 100644 index 0000000..d0de8dc --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.examples; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.flink.graph.examples.ConnectedComponents; +import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.test.util.TestBaseUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; + +@RunWith(Parameterized.class) +public class ConnectedComponentsITCase extends MultipleProgramsTestBase { + + private String edgesPath; + + private String resultPath; + + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + public ConnectedComponentsITCase(TestExecutionMode mode) { + super(mode); + } + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + + File edgesFile = tempFolder.newFile(); + Files.write(ConnectedComponentsDefaultData.EDGES, edgesFile, Charsets.UTF_8); + edgesPath = edgesFile.toURI().toString(); + } + + @Test + public void testConnectedComponentsExample() throws Exception { + ConnectedComponents.main(new String[]{edgesPath, resultPath, ConnectedComponentsDefaultData.MAX_ITERATIONS + ""}); + expected = ConnectedComponentsDefaultData.VERTICES_WITH_MIN_ID; + } + + @After + public void after() throws Exception { + TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/EuclideanGraphWeighingITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/EuclideanGraphWeighingITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/EuclideanGraphWeighingITCase.java new file mode 100644 index 0000000..922c4b2 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/EuclideanGraphWeighingITCase.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.examples; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.flink.graph.examples.EuclideanGraphWeighing; +import org.apache.flink.graph.examples.data.EuclideanGraphData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.test.util.TestBaseUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; + +@RunWith(Parameterized.class) +public class EuclideanGraphWeighingITCase extends MultipleProgramsTestBase { + + private String verticesPath; + + private String edgesPath; + + private String resultPath; + + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + public EuclideanGraphWeighingITCase(TestExecutionMode mode) { + super(mode); + } + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + File verticesFile = tempFolder.newFile(); + Files.write(EuclideanGraphData.VERTICES, verticesFile, Charsets.UTF_8); + + File edgesFile = tempFolder.newFile(); + Files.write(EuclideanGraphData.EDGES, edgesFile, Charsets.UTF_8); + + verticesPath = verticesFile.toURI().toString(); + edgesPath = edgesFile.toURI().toString(); + } + + @Test + public void testGraphWeightingWeighing() throws Exception { + EuclideanGraphWeighing.main(new String[]{verticesPath, edgesPath, resultPath}); + expected = EuclideanGraphData.RESULTED_WEIGHTED_EDGES; + } + + @After + public void after() throws Exception { + TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java new file mode 100644 index 0000000..d27dcd8 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.examples; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.examples.IncrementalSSSP; +import org.apache.flink.graph.examples.data.IncrementalSSSPData; +import org.apache.flink.graph.spargel.ScatterGatherConfiguration; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.test.util.TestBaseUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; + +@RunWith(Parameterized.class) +public class IncrementalSSSPITCase extends MultipleProgramsTestBase { + + private String verticesPath; + + private String edgesPath; + + private String edgesInSSSPPath; + + private String resultPath; + + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + public IncrementalSSSPITCase(TestExecutionMode mode) { + super(mode); + } + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + File verticesFile = tempFolder.newFile(); + Files.write(IncrementalSSSPData.VERTICES, verticesFile, Charsets.UTF_8); + + File edgesFile = tempFolder.newFile(); + Files.write(IncrementalSSSPData.EDGES, edgesFile, Charsets.UTF_8); + + File edgesInSSSPFile = tempFolder.newFile(); + Files.write(IncrementalSSSPData.EDGES_IN_SSSP, edgesInSSSPFile, Charsets.UTF_8); + + verticesPath = verticesFile.toURI().toString(); + edgesPath = edgesFile.toURI().toString(); + edgesInSSSPPath = edgesInSSSPFile.toURI().toString(); + } + + @Test + public void testIncrementalSSSP() throws Exception { + IncrementalSSSP.main(new String[]{verticesPath, edgesPath, edgesInSSSPPath, + IncrementalSSSPData.SRC_EDGE_TO_BE_REMOVED, IncrementalSSSPData.TRG_EDGE_TO_BE_REMOVED, + IncrementalSSSPData.VAL_EDGE_TO_BE_REMOVED,resultPath, IncrementalSSSPData.NUM_VERTICES + ""}); + expected = IncrementalSSSPData.RESULTED_VERTICES; + } + + @Test + public void testIncrementalSSSPNonSPEdge() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Vertex<Long, Double>> vertices = IncrementalSSSPData.getDefaultVertexDataSet(env); + DataSet<Edge<Long, Double>> edges = IncrementalSSSPData.getDefaultEdgeDataSet(env); + DataSet<Edge<Long, Double>> edgesInSSSP = IncrementalSSSPData.getDefaultEdgesInSSSP(env); + // the edge to be removed is a non-SP edge + Edge<Long, Double> edgeToBeRemoved = new Edge<>(3L, 5L, 5.0); + + Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env); + // Assumption: all minimum weight paths are kept + Graph<Long, Double, Double> ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env); + // remove the edge + graph.removeEdge(edgeToBeRemoved); + + // configure the iteration + ScatterGatherConfiguration parameters = new ScatterGatherConfiguration(); + + if(IncrementalSSSP.isInSSSP(edgeToBeRemoved, edgesInSSSP)) { + + parameters.setDirection(EdgeDirection.IN); + parameters.setOptDegrees(true); + + // run the scatter gather iteration to propagate info + Graph<Long, Double, Double> result = ssspGraph.runScatterGatherIteration( + new IncrementalSSSP.VertexDistanceUpdater(), + new IncrementalSSSP.InvalidateMessenger(edgeToBeRemoved), + IncrementalSSSPData.NUM_VERTICES, parameters); + + DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices(); + + resultedVertices.writeAsCsv(resultPath, "\n", ","); + env.execute(); + } else { + vertices.writeAsCsv(resultPath, "\n", ","); + env.execute(); + } + + expected = IncrementalSSSPData.VERTICES; + } + + @After + public void after() throws Exception { + TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/JaccardSimilarityMeasureITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/JaccardSimilarityMeasureITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/JaccardSimilarityMeasureITCase.java new file mode 100644 index 0000000..92cca86 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/JaccardSimilarityMeasureITCase.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.examples; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.flink.graph.examples.JaccardSimilarityMeasure; +import org.apache.flink.graph.examples.data.JaccardSimilarityMeasureData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.test.util.TestBaseUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; + +@RunWith(Parameterized.class) +public class JaccardSimilarityMeasureITCase extends MultipleProgramsTestBase { + + private String edgesPath; + + private String resultPath; + + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + public JaccardSimilarityMeasureITCase(TestExecutionMode mode) { + super(mode); + } + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + + File edgesFile = tempFolder.newFile(); + Files.write(JaccardSimilarityMeasureData.EDGES, edgesFile, Charsets.UTF_8); + + edgesPath = edgesFile.toURI().toString(); + } + + @Test + public void testJaccardSimilarityMeasureExample() throws Exception { + JaccardSimilarityMeasure.main(new String[]{edgesPath, resultPath}); + expected = JaccardSimilarityMeasureData.JACCARD_EDGES; + } + + @After + public void after() throws Exception { + TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/MusicProfilesITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/MusicProfilesITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/MusicProfilesITCase.java new file mode 100644 index 0000000..d76a3ec --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/MusicProfilesITCase.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.examples; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; + +import org.apache.flink.graph.examples.MusicProfiles; +import org.apache.flink.graph.examples.data.MusicProfilesData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.test.util.TestBaseUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; + +@RunWith(Parameterized.class) +public class MusicProfilesITCase extends MultipleProgramsTestBase { + + private String tripletsPath; + + private String mismatchesPath; + + private String topSongsResultPath; + + private String communitiesResultPath; + + private String expectedTopSongs; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + public MusicProfilesITCase(TestExecutionMode mode) { + super(mode); + } + + @Before + public void before() throws Exception { + topSongsResultPath = tempFolder.newFile().toURI().toString(); + communitiesResultPath = tempFolder.newFile().toURI().toString(); + + File tripletsFile = tempFolder.newFile(); + Files.write(MusicProfilesData.USER_SONG_TRIPLETS, tripletsFile, Charsets.UTF_8); + tripletsPath = tripletsFile.toURI().toString(); + + File mismatchesFile = tempFolder.newFile(); + Files.write(MusicProfilesData.MISMATCHES, mismatchesFile, Charsets.UTF_8); + mismatchesPath = mismatchesFile.toURI().toString(); + } + + @Test + public void testMusicProfilesExample() throws Exception { + MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, "0", communitiesResultPath, + MusicProfilesData.MAX_ITERATIONS + ""}); + expectedTopSongs = MusicProfilesData.TOP_SONGS_RESULT; + } + + @After + public void after() throws Exception { + TestBaseUtils.compareResultsByLinesInMemory(expectedTopSongs, topSongsResultPath); + + ArrayList<String> list = new ArrayList<>(); + TestBaseUtils.readAllResultLines(list, communitiesResultPath, new String[]{}, false); + + String[] result = list.toArray(new String[list.size()]); + Arrays.sort(result); + + // check that user_1 and user_2 are in the same community + Assert.assertEquals("users 1 and 2 are not in the same community", + result[0].substring(7), result[1].substring(7)); + + // check that user_3, user_4 and user_5 are in the same community + Assert.assertEquals("users 3 and 4 are not in the same community", + result[2].substring(7), result[3].substring(7)); + Assert.assertEquals("users 4 and 5 are not in the same community", + result[3].substring(7), result[4].substring(7)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java new file mode 100644 index 0000000..faf92c0 --- /dev/null +++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.examples; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; + +import org.apache.flink.graph.examples.GSASingleSourceShortestPaths; +import org.apache.flink.graph.examples.SingleSourceShortestPaths; +import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.test.util.TestBaseUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; + +@RunWith(Parameterized.class) +public class SingleSourceShortestPathsITCase extends MultipleProgramsTestBase { + + private String edgesPath; + + private String resultPath; + + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + public SingleSourceShortestPathsITCase(TestExecutionMode mode) { + super(mode); + } + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + + File edgesFile = tempFolder.newFile(); + Files.write(SingleSourceShortestPathsData.EDGES, edgesFile, Charsets.UTF_8); + edgesPath = edgesFile.toURI().toString(); + } + + @Test + public void testSSSPExample() throws Exception { + SingleSourceShortestPaths.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "", + edgesPath, resultPath, 10 + ""}); + expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS; + } + + @Test + public void testGSASSSPExample() throws Exception { + GSASingleSourceShortestPaths.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "", + edgesPath, resultPath, 10 + ""}); + expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS; + } + + @After + public void after() throws Exception { + TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala deleted file mode 100644 index 75b793e..0000000 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.scala.example - -import org.apache.flink.api.scala._ -import org.apache.flink.graph.scala._ -import org.apache.flink.graph.Edge -import org.apache.flink.types.NullValue -import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.graph.library.GSAConnectedComponents -import java.lang.Long - -/** - * This example shows how to use Gelly's library methods. - * You can find all available library methods in [[org.apache.flink.graph.library]]. - * - * In particular, this example uses the - * [[org.apache.flink.graph.library.GSAConnectedComponents]] - * library method to compute the connected components of the input graph. - * - * The input file is a plain text file and must be formatted as follows: - * Edges are represented by tuples of srcVertexId, trgVertexId which are - * separated by tabs. Edges themselves are separated by newlines. - * For example: <code>1\t2\n1\t3\n</code> defines two edges, - * 1-2 and 1-3. - * - * Usage {{ - * ConnectedComponents <edge path> <result path> <number of iterations> - * }} - * If no parameters are provided, the program is run with default data from - * [[org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData]] - */ -object ConnectedComponents { - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - val env = ExecutionEnvironment.getExecutionEnvironment - val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env) - val graph = Graph.fromDataSet[Long, Long, NullValue](edges, new InitVertices, env) - - val components = graph.run(new GSAConnectedComponents[Long, NullValue](maxIterations)) - - - // emit result - if (fileOutput) { - components.writeAsCsv(outputPath, "\n", ",") - env.execute("Connected Components Example") - } else { - components.print() - } - } - - private final class InitVertices extends MapFunction[Long, Long] { - override def map(id: Long) = id - } - - // *********************************************************************** - // UTIL METHODS - // *********************************************************************** - - private var fileOutput = false - private var edgesInputPath: String = null - private var outputPath: String = null - private var maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS - - private def parseParameters(args: Array[String]): Boolean = { - if(args.length > 0) { - if(args.length != 3) { - System.err.println("Usage ConnectedComponents <edge path> <output path> " + - "<num iterations>") - } - fileOutput = true - edgesInputPath = args(0) - outputPath = args(1) - maxIterations = 2 - } else { - System.out.println("Executing ConnectedComponents example with default parameters" + - " and built-in default data.") - System.out.println(" Provide parameters to read input data from files.") - System.out.println(" See the documentation for the correct format of input files.") - System.out.println("Usage ConnectedComponents <edge path> <output path> " + - "<num iterations>") - } - true - } - - private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = { - if (fileOutput) { - env.readCsvFile[(Long, Long)](edgesInputPath, - lineDelimiter = "\n", - fieldDelimiter = "\t") - .map(edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance)) - } else { - val edgeData = ConnectedComponentsDefaultData.DEFAULT_EDGES map { - case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long]) - } - env.fromCollection(edgeData).map( - edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance)) - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala deleted file mode 100644 index 68435ba..0000000 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.scala.example - -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.scala._ -import org.apache.flink.graph.Edge -import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData -import org.apache.flink.graph.gsa.{ApplyFunction, GatherFunction, Neighbor, SumFunction} -import org.apache.flink.graph.scala._ -import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap - -/** - * This example shows how to use Gelly's gather-sum-apply iterations. - * - * It is an implementation of the Single-Source-Shortest-Paths algorithm. - * - * The input file is a plain text file and must be formatted as follows: - * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are - * separated by tabs. Edges themselves are separated by newlines. - * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges, - * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4. - * - * If no parameters are provided, the program is run with default data from - * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]] - */ -object GSASingleSourceShortestPaths { - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - val env = ExecutionEnvironment.getExecutionEnvironment - val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env) - val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env) - - // Execute the gather-sum-apply iteration - val result = graph.runGatherSumApplyIteration(new CalculateDistances, new ChooseMinDistance, - new UpdateDistance, maxIterations) - - // Extract the vertices as the result - val singleSourceShortestPaths = result.getVertices - - // emit result - if (fileOutput) { - singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",") - env.execute("GSA Single Source Shortest Paths Example") - } else { - singleSourceShortestPaths.print() - } - } - - // -------------------------------------------------------------------------------------------- - // Single Source Shortest Path UDFs - // -------------------------------------------------------------------------------------------- - - private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] { - - override def map(id: Long) = { - if (id.equals(srcId)) { - 0.0 - } else { - Double.PositiveInfinity - } - } - } - - private final class CalculateDistances extends GatherFunction[Double, Double, Double] { - override def gather(neighbor: Neighbor[Double, Double]) = { - neighbor.getNeighborValue + neighbor.getEdgeValue - } - } - - private final class ChooseMinDistance extends SumFunction[Double, Double, Double] { - override def sum(newValue: Double, currentValue: Double) = { - Math.min(newValue, currentValue) - } - } - - private final class UpdateDistance extends ApplyFunction[Long, Double, Double] { - override def apply(newDistance: Double, oldDistance: Double) = { - if (newDistance < oldDistance) { - setResult(newDistance) - } - } - } - - // ************************************************************************** - // UTIL METHODS - // ************************************************************************** - - private var fileOutput = false - private var srcVertexId = 1L - private var edgesInputPath: String = null - private var outputPath: String = null - private var maxIterations = 5 - - private def parseParameters(args: Array[String]): Boolean = { - if(args.length > 0) { - if(args.length != 4) { - System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" + - " <input edges path> <output path> <num iterations>") - } - fileOutput = true - srcVertexId = args(0).toLong - edgesInputPath = args(1) - outputPath = args(2) - maxIterations = 3 - } else { - System.out.println("Executing Single Source Shortest Paths example " - + "with default parameters and built-in default data.") - System.out.println(" Provide parameters to read input data from files.") - System.out.println(" See the documentation for the correct format of input files.") - System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" + - " <input edges path> <output path> <num iterations>") - } - true - } - - private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = { - if (fileOutput) { - env.readCsvFile[(Long, Long, Double)](edgesInputPath, - lineDelimiter = "\n", - fieldDelimiter = "\t") - .map(new Tuple3ToEdgeMap[Long, Double]()) - } else { - val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map { - case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long], - z.asInstanceOf[Double]) - } - env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]()) - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala deleted file mode 100644 index 1c3fcdd..0000000 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.graph.scala.example - -import org.apache.flink.api.scala._ -import org.apache.flink.graph.scala._ -import org.apache.flink.types.NullValue -import org.apache.flink.graph.Edge -import org.apache.flink.util.Collector - -/** - * This example illustrates how to use Gelly metrics methods and get simple statistics - * from the input graph. - * - * The program creates a random graph and computes and prints - * the following metrics: - * - number of vertices - * - number of edges - * - average node degree - * - the vertex ids with the max/min in- and out-degrees - * - * The input file is expected to contain one edge per line, - * with long IDs and no values, in the following format: - * {{{ - * <sourceVertexID>\t<targetVertexID> - * }}} - * If no arguments are provided, the example runs with a random graph of 100 vertices. - * - */ -object GraphMetrics { - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - val env = ExecutionEnvironment.getExecutionEnvironment - /** create the graph **/ - val graph: Graph[Long, NullValue, NullValue] = Graph.fromDataSet(getEdgeDataSet(env), env) - - /** get the number of vertices **/ - val numVertices = graph.numberOfVertices - - /** get the number of edges **/ - val numEdges = graph.numberOfEdges - - /** compute the average node degree **/ - val verticesWithDegrees = graph.getDegrees - val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble) - - /** find the vertex with the maximum in-degree **/ - val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1) - - /** find the vertex with the minimum in-degree **/ - val minInDegreeVertex = graph.inDegrees.min(1).map(in => in._1) - - /** find the vertex with the maximum out-degree **/ - val maxOutDegreeVertex = graph.outDegrees.max(1).map(in => in._1) - - /** find the vertex with the minimum out-degree **/ - val minOutDegreeVertex = graph.outDegrees.min(1).map(in => in._1) - - /** print the results **/ - env.fromElements(numVertices).printOnTaskManager("Total number of vertices") - env.fromElements(numEdges).printOnTaskManager("Total number of edges") - avgDegree.printOnTaskManager("Average node degree") - maxInDegreeVertex.printOnTaskManager("Vertex with Max in-degree") - minInDegreeVertex.printOnTaskManager("Vertex with Max in-degree") - maxOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree") - minOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree") - - } - - private def parseParameters(args: Array[String]): Boolean = { - if (args.length > 0) { - fileOutput = true - if (args.length == 1) { - edgesPath = args(0) - true - } else { - System.err.println("Usage: GraphMetrics <edges path>") - false - } - } else { - System.out.println("Executing GraphMetrics example with built-in default data.") - System.out.println(" Provide parameters to read input data from a file.") - System.out.println(" Usage: GraphMetrics <edges path>") - true - } - } - - private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = { - if (fileOutput) { - env.readCsvFile[(Long, Long)]( - edgesPath, - fieldDelimiter = "\t").map( - in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance())) - } else { - env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]]( - (key: Long, out: Collector[Edge[Long, NullValue]]) => { - val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt - for ( i <- 0 to numOutEdges ) { - val target: Long = ((Math.random() * numVertices) + 1).toLong - new Edge[Long, NullValue](key, target, NullValue.getInstance()) - } - }) - } - } - - private var fileOutput: Boolean = false - private var edgesPath: String = null - private var outputPath: String = null - private val numVertices = 100 -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala deleted file mode 100644 index 827f1a3..0000000 --- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.scala.example - -import org.apache.flink.api.scala._ -import org.apache.flink.graph.scala._ -import org.apache.flink.graph.Edge -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.graph.spargel.VertexUpdateFunction -import org.apache.flink.graph.spargel.MessageIterator -import org.apache.flink.graph.Vertex -import org.apache.flink.graph.spargel.MessagingFunction -import scala.collection.JavaConversions._ -import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap -import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData - -/** - * This example shows how to use Gelly's scatter-gather iterations. - * - * It is an implementation of the Single-Source-Shortest-Paths algorithm. - * - * The input file is a plain text file and must be formatted as follows: - * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are - * separated by tabs. Edges themselves are separated by newlines. - * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges, - * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4. - * - * If no parameters are provided, the program is run with default data from - * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]] - */ -object SingleSourceShortestPaths { - def main(args: Array[String]) { - if (!parseParameters(args)) { - return - } - - val env = ExecutionEnvironment.getExecutionEnvironment - val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env) - val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env) - - // Execute the scatter-gather iteration - val result = graph.runScatterGatherIteration(new VertexDistanceUpdater, - new MinDistanceMessenger, maxIterations) - - // Extract the vertices as the result - val singleSourceShortestPaths = result.getVertices - - // emit result - if (fileOutput) { - singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",") - env.execute("Single Source Shortest Paths Example") - } else { - singleSourceShortestPaths.print() - } - } - - // -------------------------------------------------------------------------------------------- - // Single Source Shortest Path UDFs - // -------------------------------------------------------------------------------------------- - - private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] { - - override def map(id: Long) = { - if (id.equals(srcId)) { - 0.0 - } else { - Double.PositiveInfinity - } - } - } - - /** - * Function that updates the value of a vertex by picking the minimum - * distance from all incoming messages. - */ - private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Double] { - - override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) { - var minDistance = Double.MaxValue - while (inMessages.hasNext) { - val msg = inMessages.next - if (msg < minDistance) { - minDistance = msg - } - } - if (vertex.getValue > minDistance) { - setNewVertexValue(minDistance) - } - } - } - - /** - * Distributes the minimum distance associated with a given vertex among all - * the target vertices summed up with the edge's value. - */ - private final class MinDistanceMessenger extends - MessagingFunction[Long, Double, Double, Double] { - - override def sendMessages(vertex: Vertex[Long, Double]) { - if (vertex.getValue < Double.PositiveInfinity) { - for (edge: Edge[Long, Double] <- getEdges) { - sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue) - } - } - } - } - - // **************************************************************************** - // UTIL METHODS - // **************************************************************************** - - private var fileOutput = false - private var srcVertexId = 1L - private var edgesInputPath: String = null - private var outputPath: String = null - private var maxIterations = 5 - - private def parseParameters(args: Array[String]): Boolean = { - if(args.length > 0) { - if(args.length != 4) { - System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" + - " <input edges path> <output path> <num iterations>") - } - fileOutput = true - srcVertexId = args(0).toLong - edgesInputPath = args(1) - outputPath = args(2) - maxIterations = 3 - } else { - System.out.println("Executing Single Source Shortest Paths example " - + "with default parameters and built-in default data.") - System.out.println(" Provide parameters to read input data from files.") - System.out.println(" See the documentation for the correct format of input files.") - System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" + - " <input edges path> <output path> <num iterations>") - } - true - } - - private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = { - if (fileOutput) { - env.readCsvFile[(Long, Long, Double)](edgesInputPath, - lineDelimiter = "\n", - fieldDelimiter = "\t") - .map(new Tuple3ToEdgeMap[Long, Double]()) - } else { - val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map { - case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long], - z.asInstanceOf[Double]) - } - env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]()) - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java deleted file mode 100644 index cd52e04..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData; -import org.apache.flink.graph.library.GSAConnectedComponents; -import org.apache.flink.types.NullValue; - -/** - * This example shows how to use Gelly's library methods. - * You can find all available library methods in {@link org.apache.flink.graph.library}. - * - * In particular, this example uses the {@link org.apache.flink.graph.library.GSAConnectedComponents} - * library method to compute the connected components of the input graph. - * - * The input file is a plain text file and must be formatted as follows: - * Edges are represented by tuples of srcVertexId, trgVertexId which are - * separated by tabs. Edges themselves are separated by newlines. - * For example: <code>1\t2\n1\t3\n</code> defines two edges, - * 1-2 with and 1-3. - * - * Usage <code>ConnectedComponents <edge path> <result path> - * <number of iterations> </code><br> - * If no parameters are provided, the program is run with default data from - * {@link org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData} - */ -public class ConnectedComponents implements ProgramDescription { - - @SuppressWarnings("serial") - public static void main(String [] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env); - - Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new MapFunction<Long, Long>() { - @Override - public Long map(Long value) throws Exception { - return value; - } - }, env); - - DataSet<Vertex<Long, Long>> verticesWithMinIds = graph - .run(new GSAConnectedComponents<Long, NullValue>(maxIterations)); - - // emit result - if (fileOutput) { - verticesWithMinIds.writeAsCsv(outputPath, "\n", ","); - - // since file sinks are lazy, we trigger the execution explicitly - env.execute("Connected Components Example"); - } else { - verticesWithMinIds.print(); - } - } - - @Override - public String getDescription() { - return "Connected Components Example"; - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String edgeInputPath = null; - private static String outputPath = null; - private static Integer maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS; - - private static boolean parseParameters(String [] args) { - if(args.length > 0) { - if(args.length != 3) { - System.err.println("Usage ConnectedComponents <edge path> <output path> " + - "<num iterations>"); - return false; - } - - fileOutput = true; - edgeInputPath = args[0]; - outputPath = args[1]; - maxIterations = Integer.parseInt(args[2]); - - } else { - System.out.println("Executing ConnectedComponents example with default parameters and built-in default data."); - System.out.println("Provide parameters to read input data from files."); - System.out.println("Usage ConnectedComponents <edge path> <output path> " + - "<num iterations>"); - } - - return true; - } - - @SuppressWarnings("serial") - private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) { - - if(fileOutput) { - return env.readCsvFile(edgeInputPath) - .ignoreComments("#") - .fieldDelimiter("\t") - .lineDelimiter("\n") - .types(Long.class, Long.class) - .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() { - @Override - public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception { - return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance()); - } - }); - } else { - return ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java deleted file mode 100644 index 712be3e..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.EdgeJoinFunction; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Triplet; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.utils.EuclideanGraphData; - -import java.io.Serializable; - -/** - * This example shows how to use Gelly's {@link Graph#getTriplets()} and - * {@link Graph#joinWithEdges(DataSet, EdgeJoinFunction)} methods. - * - * Given a directed, unweighted graph, with vertex values representing points in a plan, - * return a weighted graph where the edge weights are equal to the Euclidean distance between the - * src and the trg vertex values. - * - * <p> - * Input files are plain text files and must be formatted as follows: - * <ul> - * <li> Vertices are represented by their vertexIds and vertex values and are separated by newlines, - * the value being formed of two doubles separated by a comma. - * For example: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> defines a data set of three vertices - * <li> Edges are represented by pairs of srcVertexId, trgVertexId separated by commas. - * Edges themselves are separated by newlines. - * For example: <code>1,2\n1,3\n</code> defines two edges 1-2 and 1-3. - * </ul> - * - * Usage <code>EuclideanGraphWeighing <vertex path> <edge path> <result path></code><br> - * If no parameters are provided, the program is run with default data from - * {@link org.apache.flink.graph.example.utils.EuclideanGraphData} - */ -@SuppressWarnings("serial") -public class EuclideanGraphWeighing implements ProgramDescription { - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env); - - DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env); - - Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, edges, env); - - // the edge value will be the Euclidean distance between its src and trg vertex - DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = graph.getTriplets() - .map(new MapFunction<Triplet<Long, Point, Double>, Tuple3<Long, Long, Double>>() { - - @Override - public Tuple3<Long, Long, Double> map(Triplet<Long, Point, Double> triplet) - throws Exception { - - Vertex<Long, Point> srcVertex = triplet.getSrcVertex(); - Vertex<Long, Point> trgVertex = triplet.getTrgVertex(); - - return new Tuple3<Long, Long, Double>(srcVertex.getId(), trgVertex.getId(), - srcVertex.getValue().euclideanDistance(trgVertex.getValue())); - } - }); - - Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(edgesWithEuclideanWeight, - new EdgeJoinFunction<Double, Double>() { - - public Double edgeJoin(Double edgeValue, Double inputValue) { - return inputValue; - } - }); - - // retrieve the edges from the final result - DataSet<Edge<Long, Double>> result = resultedGraph.getEdges(); - - // emit result - if (fileOutput) { - result.writeAsCsv(outputPath, "\n", ","); - - // since file sinks are lazy, we trigger the execution explicitly - env.execute("Euclidean Graph Weighing Example"); - } else { - result.print(); - } - - } - - @Override - public String getDescription() { - return "Weighing a graph by computing the Euclidean distance " + - "between its vertices"; - } - - // ************************************************************************* - // DATA TYPES - // ************************************************************************* - - /** - * A simple two-dimensional point. - */ - public static class Point implements Serializable { - - public double x, y; - - public Point() {} - - public Point(double x, double y) { - this.x = x; - this.y = y; - } - - public double euclideanDistance(Point other) { - return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y)); - } - - @Override - public String toString() { - return x + " " + y; - } - } - - // ****************************************************************************************************************** - // UTIL METHODS - // ****************************************************************************************************************** - - private static boolean fileOutput = false; - - private static String verticesInputPath = null; - - private static String edgesInputPath = null; - - private static String outputPath = null; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - if (args.length == 3) { - fileOutput = true; - verticesInputPath = args[0]; - edgesInputPath = args[1]; - outputPath = args[2]; - } else { - System.out.println("Executing Euclidean Graph Weighing example with default parameters and built-in default data."); - System.out.println("Provide parameters to read input data from files."); - System.out.println("See the documentation for the correct format of input files."); - System.err.println("Usage: EuclideanGraphWeighing <input vertices path> <input edges path>" + - " <output path>"); - return false; - } - } - return true; - } - - private static DataSet<Vertex<Long, Point>> getVerticesDataSet(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(verticesInputPath) - .lineDelimiter("\n") - .types(Long.class, Double.class, Double.class) - .map(new MapFunction<Tuple3<Long, Double, Double>, Vertex<Long, Point>>() { - - @Override - public Vertex<Long, Point> map(Tuple3<Long, Double, Double> value) throws Exception { - return new Vertex<Long, Point>(value.f0, new Point(value.f1, value.f2)); - } - }); - } else { - return EuclideanGraphData.getDefaultVertexDataSet(env); - } - } - - private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(edgesInputPath) - .lineDelimiter("\n") - .types(Long.class, Long.class) - .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() { - - @Override - public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception { - return new Edge<Long, Double>(tuple2.f0, tuple2.f1, 0.0); - } - }); - } else { - return EuclideanGraphData.getDefaultEdgeDataSet(env); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java deleted file mode 100755 index 635a099..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData; -import org.apache.flink.graph.gsa.ApplyFunction; -import org.apache.flink.graph.gsa.GatherFunction; -import org.apache.flink.graph.gsa.SumFunction; -import org.apache.flink.graph.gsa.Neighbor; -import org.apache.flink.graph.utils.Tuple3ToEdgeMap; - -/** - * This example shows how to use Gelly's Gather-Sum-Apply iterations. - * - * It is an implementation of the Single-Source-Shortest-Paths algorithm. - * For a vertex-centric implementation of the same algorithm, please refer to {@link SingleSourceShortestPaths}. - * - * The input file is a plain text file and must be formatted as follows: - * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are - * separated by tabs. Edges themselves are separated by newlines. - * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges, - * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4. - * - * If no parameters are provided, the program is run with default data from - * {@link org.apache.flink.graph.example.utils.SingleSourceShortestPathsData} - */ -public class GSASingleSourceShortestPaths implements ProgramDescription { - - // -------------------------------------------------------------------------------------------- - // Program - // -------------------------------------------------------------------------------------------- - - public static void main(String[] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env); - - Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env); - - // Execute the GSA iteration - Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration( - new CalculateDistances(), new ChooseMinDistance(), new UpdateDistance(), maxIterations); - - // Extract the vertices as the result - DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices(); - - // emit result - if(fileOutput) { - singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ","); - - // since file sinks are lazy, we trigger the execution explicitly - env.execute("GSA Single Source Shortest Paths"); - } else { - singleSourceShortestPaths.print(); - } - - } - - // -------------------------------------------------------------------------------------------- - // Single Source Shortest Path UDFs - // -------------------------------------------------------------------------------------------- - - @SuppressWarnings("serial") - private static final class InitVertices implements MapFunction<Long, Double>{ - - private long srcId; - - public InitVertices(long srcId) { - this.srcId = srcId; - } - - public Double map(Long id) { - if (id.equals(srcId)) { - return 0.0; - } - else { - return Double.POSITIVE_INFINITY; - } - } - } - - @SuppressWarnings("serial") - private static final class CalculateDistances extends GatherFunction<Double, Double, Double> { - - public Double gather(Neighbor<Double, Double> neighbor) { - return neighbor.getNeighborValue() + neighbor.getEdgeValue(); - } - }; - - @SuppressWarnings("serial") - private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> { - - public Double sum(Double newValue, Double currentValue) { - return Math.min(newValue, currentValue); - } - }; - - @SuppressWarnings("serial") - private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> { - - public void apply(Double newDistance, Double oldDistance) { - if (newDistance < oldDistance) { - setResult(newDistance); - } - } - } - - // -------------------------------------------------------------------------------------------- - // Util methods - // -------------------------------------------------------------------------------------------- - - private static boolean fileOutput = false; - - private static Long srcVertexId = 1l; - - private static String edgesInputPath = null; - - private static String outputPath = null; - - private static int maxIterations = 5; - - private static boolean parseParameters(String[] args) { - - if (args.length > 0) { - if(args.length != 4) { - System.err.println("Usage: GSASingleSourceShortestPaths <source vertex id>" + - " <input edges path> <output path> <num iterations>"); - return false; - } - - fileOutput = true; - srcVertexId = Long.parseLong(args[0]); - edgesInputPath = args[1]; - outputPath = args[2]; - maxIterations = Integer.parseInt(args[3]); - } else { - System.out.println("Executing GSASingle Source Shortest Paths example " - + "with default parameters and built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println("Usage: GSASingleSourceShortestPaths <source vertex id>" + - " <input edges path> <output path> <num iterations>"); - } - return true; - } - - private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(edgesInputPath) - .fieldDelimiter("\t") - .lineDelimiter("\n") - .types(Long.class, Long.class, Double.class) - .map(new Tuple3ToEdgeMap<Long, Double>()); - } else { - return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env); - } - } - - @Override - public String getDescription() { - return "GSA Single Source Shortest Paths"; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java deleted file mode 100644 index 117f7d1..0000000 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.example; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.example.utils.ExampleUtils; -import org.apache.flink.types.NullValue; - -/** - * This example illustrates how to use Gelly metrics methods and get simple statistics - * from the input graph. - * - * The program creates a random graph and computes and prints - * the following metrics: - * - number of vertices - * - number of edges - * - average node degree - * - the vertex ids with the max/min in- and out-degrees - * - * The input file is expected to contain one edge per line, - * with long IDs and no values, in the following format: - * "<sourceVertexID>\t<targetVertexID>". - * If no arguments are provided, the example runs with a random graph of 100 vertices. - * - */ -public class GraphMetrics implements ProgramDescription { - - public static void main(String[] args) throws Exception { - - if (!parseParameters(args)) { - return; - } - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - /** create the graph **/ - Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(getEdgesDataSet(env), env); - - /** get the number of vertices **/ - long numVertices = graph.numberOfVertices(); - - /** get the number of edges **/ - long numEdges = graph.numberOfEdges(); - - /** compute the average node degree **/ - DataSet<Tuple2<Long, Long>> verticesWithDegrees = graph.getDegrees(); - - DataSet<Double> avgNodeDegree = verticesWithDegrees - .aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper(numVertices)); - - /** find the vertex with the maximum in-degree **/ - DataSet<Long> maxInDegreeVertex = graph.inDegrees().maxBy(1).map(new ProjectVertexId()); - - /** find the vertex with the minimum in-degree **/ - DataSet<Long> minInDegreeVertex = graph.inDegrees().minBy(1).map(new ProjectVertexId()); - - /** find the vertex with the maximum out-degree **/ - DataSet<Long> maxOutDegreeVertex = graph.outDegrees().maxBy(1).map(new ProjectVertexId()); - - /** find the vertex with the minimum out-degree **/ - DataSet<Long> minOutDegreeVertex = graph.outDegrees().minBy(1).map(new ProjectVertexId()); - - /** print the results **/ - ExampleUtils.printResult(env.fromElements(numVertices), "Total number of vertices"); - ExampleUtils.printResult(env.fromElements(numEdges), "Total number of edges"); - ExampleUtils.printResult(avgNodeDegree, "Average node degree"); - ExampleUtils.printResult(maxInDegreeVertex, "Vertex with Max in-degree"); - ExampleUtils.printResult(minInDegreeVertex, "Vertex with Min in-degree"); - ExampleUtils.printResult(maxOutDegreeVertex, "Vertex with Max out-degree"); - ExampleUtils.printResult(minOutDegreeVertex, "Vertex with Min out-degree"); - - env.execute(); - } - - @SuppressWarnings("serial") - private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, Long>, Double> { - - private long numberOfVertices; - - public AvgNodeDegreeMapper(long numberOfVertices) { - this.numberOfVertices = numberOfVertices; - } - - public Double map(Tuple2<Long, Long> sumTuple) { - return (double) (sumTuple.f1 / numberOfVertices) ; - } - } - - @SuppressWarnings("serial") - private static final class ProjectVertexId implements MapFunction<Tuple2<Long,Long>, Long> { - public Long map(Tuple2<Long, Long> value) { return value.f0; } - } - - @Override - public String getDescription() { - return "Graph Metrics Example"; - } - - // ****************************************************************************************************************** - // UTIL METHODS - // ****************************************************************************************************************** - - private static boolean fileOutput = false; - - private static String edgesInputPath = null; - - static final int NUM_VERTICES = 100; - - static final long SEED = 9876; - - private static boolean parseParameters(String[] args) { - - if(args.length > 0) { - if(args.length != 1) { - System.err.println("Usage: GraphMetrics <input edges>"); - return false; - } - - fileOutput = true; - edgesInputPath = args[0]; - } else { - System.out.println("Executing Graph Metrics example with default parameters and built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println("Usage: GraphMetrics <input edges>"); - } - return true; - } - - @SuppressWarnings("serial") - private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(edgesInputPath) - .lineDelimiter("\n").fieldDelimiter("\t") - .types(Long.class, Long.class).map( - new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() { - - public Edge<Long, NullValue> map(Tuple2<Long, Long> value) { - return new Edge<Long, NullValue>(value.f0, value.f1, - NullValue.getInstance()); - } - }); - } else { - return ExampleUtils.getRandomEdges(env, NUM_VERTICES); - } - } -} \ No newline at end of file
