Repository: flink Updated Branches: refs/heads/master 1f726e482 -> 9eef3c86c
[FLINK-1726][gelly] Added Community Detection Library and Example This closes #505 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e3ba403 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e3ba403 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e3ba403 Branch: refs/heads/master Commit: 4e3ba4039d694e539dcdbca74fd628140f85d5e9 Parents: 1f726e4 Author: andralungu <[email protected]> Authored: Fri Mar 20 16:43:59 2015 +0100 Committer: Vasia Kalavri <[email protected]> Committed: Thu Mar 26 23:36:00 2015 +0100 ---------------------------------------------------------------------- docs/gelly_guide.md | 1 + .../SimpleCommunityDetectionExample.java | 129 +++++++++++++ .../SingleSourceShortestPathsExample.java | 10 +- .../utils/SimpleCommunityDetectionData.java | 65 +++++++ .../graph/library/SimpleCommunityDetection.java | 187 +++++++++++++++++++ .../example/SimpleCommunityDetectionITCase.java | 100 ++++++++++ 6 files changed, 484 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/docs/gelly_guide.md ---------------------------------------------------------------------- diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md index 32c076b..0884405 100644 --- a/docs/gelly_guide.md +++ b/docs/gelly_guide.md @@ -402,6 +402,7 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc * PageRank * Single-Source Shortest Paths * Label Propagation +* Simple Community Detection Gelly's library methods can be used by simply calling the `run()` method on the input graph: http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java new file mode 100644 index 0000000..488603c --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.SimpleCommunityDetectionData; +import org.apache.flink.graph.library.SimpleCommunityDetection; +import org.apache.flink.graph.utils.Tuple3ToEdgeMap; + +/** + * This example shows how to use the {@link org.apache.flink.graph.library.SimpleCommunityDetection} + * library method: + * <ul> + * <li> with the edge data set given as a parameter + * <li> with default data + * </ul> + */ +public class SimpleCommunityDetectionExample implements ProgramDescription { + + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + // set up the execution environment + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // set up the graph + DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env); + Graph<Long, Long, Double> graph = Graph.fromDataSet(edges, + new MapFunction<Long, Long>() { + @Override + public Long map(Long label) throws Exception { + return label; + } + }, env); + + // the result is in the form of <vertexId, communityId>, where the communityId is the label + // which the vertex converged to + DataSet<Vertex<Long, Long>> communityVertices = + graph.run(new SimpleCommunityDetection(maxIterations, delta)).getVertices(); + + // emit result + if (fileOutput) { + communityVertices.writeAsCsv(outputPath, "\n", ","); + } else { + communityVertices.print(); + } + + env.execute("Executing Simple Community Detection Example"); + } + + @Override + public String getDescription() { + return "Simple Community Detection Example"; + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String edgeInputPath = null; + private static String outputPath = null; + private static Integer maxIterations = SimpleCommunityDetectionData.MAX_ITERATIONS; + private static Double delta = SimpleCommunityDetectionData.DELTA; + + private static boolean parseParameters(String [] args) { + if(args.length > 0) { + if(args.length != 4) { + System.err.println("Usage SimpleCommunityDetection <edge path> <output path> " + + "<num iterations> <delta>"); + return false; + } + + fileOutput = true; + edgeInputPath = args[0]; + outputPath = args[1]; + maxIterations = Integer.parseInt(args[2]); + delta = Double.parseDouble(args[3]); + + } else { + System.out.println("Executing SimpleCommunityDetection example with default parameters and built-in default data."); + System.out.println("Provide parameters to read input data from files."); + System.out.println("Usage SimpleCommunityDetection <edge path> <output path> " + + "<num iterations> <delta>"); + } + + return true; + } + + @SuppressWarnings("serial") + private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) { + + if(fileOutput) { + return env.readCsvFile(edgeInputPath) + .ignoreComments("#") + .fieldDelimiter("\t") + .lineDelimiter("\n") + .types(Long.class, Long.class, Double.class) + .map(new Tuple3ToEdgeMap<Long, Double>()); + } else { + return SimpleCommunityDetectionData.getDefaultEdgeDataSet(env); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java index ff523ce..22883a8 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java @@ -22,12 +22,12 @@ import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData; import org.apache.flink.graph.library.SingleSourceShortestPaths; +import org.apache.flink.graph.utils.Tuple3ToEdgeMap; /** * This example implements the Single Source Shortest Paths algorithm, @@ -126,13 +126,7 @@ public class SingleSourceShortestPathsExample implements ProgramDescription { .lineDelimiter("\n") .fieldDelimiter("\t") .types(Long.class, Long.class, Double.class) - .map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() { - - @Override - public Edge<Long, Double> map(Tuple3<Long, Long, Double> tuple3) throws Exception { - return new Edge<Long, Double>(tuple3.f0, tuple3.f1, tuple3.f2); - } - }); + .map(new Tuple3ToEdgeMap<Long, Double>()); } else { return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env); } http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java new file mode 100644 index 0000000..20b562b --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example.utils; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; + +import java.util.ArrayList; +import java.util.List; + +/** + * Provides the default data set used for the Simple Community Detection example program. + * If no parameters are given to the program, the default edge data set is used. + */ +public class SimpleCommunityDetectionData { + + // the algorithm is not guaranteed to always converge + public static final Integer MAX_ITERATIONS = 30; + + public static final double DELTA = 0.5f; + + public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) { + + List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>(); + edges.add(new Edge<Long, Double>(1L, 2L, 1.0)); + edges.add(new Edge<Long, Double>(1L, 3L, 2.0)); + edges.add(new Edge<Long, Double>(1L, 4L, 3.0)); + edges.add(new Edge<Long, Double>(2L, 3L, 4.0)); + edges.add(new Edge<Long, Double>(2L, 4L, 5.0)); + edges.add(new Edge<Long, Double>(3L, 5L, 6.0)); + edges.add(new Edge<Long, Double>(5L, 6L, 7.0)); + edges.add(new Edge<Long, Double>(5L, 7L, 8.0)); + edges.add(new Edge<Long, Double>(6L, 7L, 9.0)); + edges.add(new Edge<Long, Double>(7L, 12L, 10.0)); + edges.add(new Edge<Long, Double>(8L, 9L, 11.0)); + edges.add(new Edge<Long, Double>(8L, 10L, 12.0)); + edges.add(new Edge<Long, Double>(8L, 11L, 13.0)); + edges.add(new Edge<Long, Double>(9L, 10L, 14.0)); + edges.add(new Edge<Long, Double>(9L, 11L, 15.0)); + edges.add(new Edge<Long, Double>(10L, 11L, 16.0)); + edges.add(new Edge<Long, Double>(10L, 12L, 17.0)); + edges.add(new Edge<Long, Double>(11L, 12L, 18.0)); + + return env.fromCollection(edges); + } + + private SimpleCommunityDetectionData() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java new file mode 100644 index 0000000..5d3afc7 --- /dev/null +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexCentricIteration; +import org.apache.flink.graph.spargel.VertexUpdateFunction; + +import java.util.Map; +import java.util.TreeMap; + +/** + * Simple Community Detection Algorithm. + * + * Initially, each vertex is assigned a tuple formed of its own id along with a score equal to 1.0, as value. + * The vertices propagate their labels and max scores in iterations, each time adopting the label with the + * highest score from the list of received messages. The chosen label is afterwards re-scored using the fraction + * delta/the superstep number. Delta is passed as a parameter and has 0.5 as a default value. + * + * The algorithm converges when vertices no longer update their value or when the maximum number of iterations + * is reached. + * + * @see <a href="http://arxiv.org/pdf/0808.2633.pdf">article explaining the algorithm in detail</a> + * + *<p> + * The input files is a plain text file and must be formatted as follows: + * <br> + * Edges are represented by tuples of srcVertexId, trgVertexId which are + * separated by tabs. Edges themselves are separated by newlines. + * For example: <code>1 2\n1 3\n</code> defines two edges 1-2 and 1-3 + * </p> + * + * Usage <code>SimpleCommunityDetection <edge path> <result path> + * <number of iterations> <delta></code><br> + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.SimpleCommunityDetectionData} + */ +public class SimpleCommunityDetection implements GraphAlgorithm<Long, Long, Double> { + + private Integer maxIterations; + + private Double delta; + + public SimpleCommunityDetection(Integer maxIterations, Double delta) { + + this.maxIterations = maxIterations; + this.delta = delta; + } + + @Override + public Graph<Long, Long, Double> run(Graph<Long, Long, Double> graph) { + + Graph<Long, Long, Double> undirectedGraph = graph.getUndirected(); + + Graph<Long, Tuple2<Long, Double>, Double> graphWithScoredVertices = undirectedGraph + .mapVertices(new AddScoreToVertexValuesMapper()); + + VertexCentricIteration<Long, Tuple2<Long, Double>, Tuple2<Long, Double>, Double> + iteration = graphWithScoredVertices.createVertexCentricIteration(new VertexLabelUpdater(delta), + new LabelMessenger(), maxIterations); + + return graphWithScoredVertices.runVertexCentricIteration(iteration) + .mapVertices(new RemoveScoreFromVertexValuesMapper()); + } + + public static final class VertexLabelUpdater extends VertexUpdateFunction<Long, Tuple2<Long, Double>, Tuple2<Long, Double>> { + + private Double delta; + + public VertexLabelUpdater(Double delta) { + this.delta = delta; + } + + @Override + public void updateVertex(Long vertexKey, Tuple2<Long, Double> labelScore, + MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception { + + // we would like these two maps to be ordered + Map<Long, Double> receivedLabelsWithScores = new TreeMap<Long, Double>(); + Map<Long, Double> labelsWithHighestScore = new TreeMap<Long, Double>(); + + for (Tuple2<Long, Double> message : inMessages) { + // split the message into received label and score + Long receivedLabel = message.f0; + Double receivedScore = message.f1; + + // if the label was received before + if (receivedLabelsWithScores.containsKey(receivedLabel)) { + Double newScore = receivedScore + receivedLabelsWithScores.get(receivedLabel); + receivedLabelsWithScores.put(receivedLabel, newScore); + } else { + // first time we see the label + receivedLabelsWithScores.put(receivedLabel, receivedScore); + } + + // store the labels with the highest scores + if (labelsWithHighestScore.containsKey(receivedLabel)) { + Double currentScore = labelsWithHighestScore.get(receivedLabel); + if (currentScore < receivedScore) { + // record the highest score + labelsWithHighestScore.put(receivedLabel, receivedScore); + } + } else { + // first time we see this label + labelsWithHighestScore.put(receivedLabel, receivedScore); + } + } + + if(receivedLabelsWithScores.size() > 0) { + // find the label with the highest score from the ones received + Double maxScore = -Double.MAX_VALUE; + Long maxScoreLabel = labelScore.f0; + for (Long curLabel : receivedLabelsWithScores.keySet()) { + + if (receivedLabelsWithScores.get(curLabel) > maxScore) { + maxScore = receivedLabelsWithScores.get(curLabel); + maxScoreLabel = curLabel; + } + } + + // find the highest score of maxScoreLabel + Double highestScore = labelsWithHighestScore.get(maxScoreLabel); + // re-score the new label + if (maxScoreLabel != labelScore.f0) { + highestScore -= delta / getSuperstepNumber(); + } + // else delta = 0 + // update own label + setNewVertexValue(new Tuple2<Long, Double>(maxScoreLabel, highestScore)); + } + } + } + + public static final class LabelMessenger extends MessagingFunction<Long, Tuple2<Long, Double>, + Tuple2<Long, Double>, Double> { + + @Override + public void sendMessages(Long vertexKey, Tuple2<Long, Double> vertexValue) throws Exception { + + for(Edge<Long, Double> edge : getOutgoingEdges()) { + sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertexValue.f0, vertexValue.f1 * edge.getValue())); + } + + } + } + + @SuppressWarnings("serial") + public static final class AddScoreToVertexValuesMapper implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Double>> { + + @Override + public Tuple2<Long, Double> map(Vertex<Long, Long> vertex) throws Exception { + return new Tuple2<Long, Double>(vertex.getValue(), 1.0); + } + } + + @SuppressWarnings("serial") + public static final class RemoveScoreFromVertexValuesMapper implements MapFunction<Vertex<Long, Tuple2<Long, Double>>, Long> { + + @Override + public Long map(Vertex<Long, Tuple2<Long, Double>> vertex) throws Exception { + return vertex.getValue().f0; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java new file mode 100644 index 0000000..def5006 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.example; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.flink.graph.example.SimpleCommunityDetectionExample; +import org.apache.flink.graph.example.utils.SimpleCommunityDetectionData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; + +@RunWith(Parameterized.class) +public class SimpleCommunityDetectionITCase extends MultipleProgramsTestBase { + + private String edgesPath; + + private String resultPath; + + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + public SimpleCommunityDetectionITCase(TestExecutionMode mode) { + super(mode); + } + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testSingleIteration() throws Exception { + /* + * Test one iteration of the Simple Community Detection Example + */ + final String edges = "1 2 1.0\n" + "1 3 2.0\n" + "1 4 3.0\n" + "1 5 4.0\n" + "2 6 5.0\n" + + "6 7 6.0\n" + "6 8 7.0\n" + "7 8 8.0"; + edgesPath = createTempFile(edges); + + SimpleCommunityDetectionExample.main(new String[]{edgesPath, resultPath, "1", + SimpleCommunityDetectionData.DELTA + ""}); + + expected = "1,5\n" + "2,6\n" + "3,1\n" + "4,1\n" + "5,1\n" + "6,8\n" + "7,8\n" + "8,7"; + } + + @Test + public void testTieBreaker() throws Exception { + /* + * Test one iteration of the Simple Community Detection Example where a tie must be broken + */ + + final String edges = "1 2 1.0\n" + "1 3 1.0\n" + "1 4 1.0\n" + "1 5 1.0"; + edgesPath = createTempFile(edges); + + SimpleCommunityDetectionExample.main(new String[] {edgesPath, resultPath, "1", + SimpleCommunityDetectionData.DELTA + ""}); + + expected = "1,2\n" + "2,1\n" + "3,1\n" + "4,1\n" + "5,1"; + } + + + // ------------------------------------------------------------------------- + // Util methods + // ------------------------------------------------------------------------- + private String createTempFile(final String rows) throws Exception { + File tempFile = tempFolder.newFile(); + Files.write(rows, tempFile, Charsets.UTF_8); + return tempFile.toURI().toString(); + } +}
