Repository: flink
Updated Branches:
  refs/heads/master 1f726e482 -> 9eef3c86c


[FLINK-1726][gelly] Added Community Detection Library and Example

This closes #505


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e3ba403
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e3ba403
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e3ba403

Branch: refs/heads/master
Commit: 4e3ba4039d694e539dcdbca74fd628140f85d5e9
Parents: 1f726e4
Author: andralungu <[email protected]>
Authored: Fri Mar 20 16:43:59 2015 +0100
Committer: Vasia Kalavri <[email protected]>
Committed: Thu Mar 26 23:36:00 2015 +0100

----------------------------------------------------------------------
 docs/gelly_guide.md                             |   1 +
 .../SimpleCommunityDetectionExample.java        | 129 +++++++++++++
 .../SingleSourceShortestPathsExample.java       |  10 +-
 .../utils/SimpleCommunityDetectionData.java     |  65 +++++++
 .../graph/library/SimpleCommunityDetection.java | 187 +++++++++++++++++++
 .../example/SimpleCommunityDetectionITCase.java | 100 ++++++++++
 6 files changed, 484 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/docs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md
index 32c076b..0884405 100644
--- a/docs/gelly_guide.md
+++ b/docs/gelly_guide.md
@@ -402,6 +402,7 @@ Gelly has a growing collection of graph algorithms for 
easily analyzing large-sc
 * PageRank
 * Single-Source Shortest Paths
 * Label Propagation
+* Simple Community Detection
 
 Gelly's library methods can be used by simply calling the `run()` method on 
the input graph:
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
new file mode 100644
index 0000000..488603c
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.SimpleCommunityDetectionData;
+import org.apache.flink.graph.library.SimpleCommunityDetection;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This example shows how to use the {@link 
org.apache.flink.graph.library.SimpleCommunityDetection}
+ * library method:
+ * <ul>
+ *     <li> with the edge data set given as a parameter
+ *     <li> with default data
+ * </ul>
+ */
+public class SimpleCommunityDetectionExample implements ProgramDescription {
+
+       public static void main(String [] args) throws Exception {
+
+               if(!parseParameters(args)) {
+                       return;
+               }
+
+               // set up the execution environment
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+               // set up the graph
+               DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+               Graph<Long, Long, Double> graph = Graph.fromDataSet(edges,
+                               new MapFunction<Long, Long>() {
+                                       @Override
+                                       public Long map(Long label) throws 
Exception {
+                                               return label;
+                                       }
+                               }, env);
+
+               // the result is in the form of <vertexId, communityId>, where 
the communityId is the label
+               // which the vertex converged to
+               DataSet<Vertex<Long, Long>> communityVertices =
+                               graph.run(new 
SimpleCommunityDetection(maxIterations, delta)).getVertices();
+
+               // emit result
+               if (fileOutput) {
+                       communityVertices.writeAsCsv(outputPath, "\n", ",");
+               } else {
+                       communityVertices.print();
+               }
+
+               env.execute("Executing Simple Community Detection Example");
+       }
+
+       @Override
+       public String getDescription() {
+               return "Simple Community Detection Example";
+       }
+
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static boolean fileOutput = false;
+       private static String edgeInputPath = null;
+       private static String outputPath = null;
+       private static Integer maxIterations = 
SimpleCommunityDetectionData.MAX_ITERATIONS;
+       private static Double delta = SimpleCommunityDetectionData.DELTA;
+
+       private static boolean parseParameters(String [] args) {
+               if(args.length > 0) {
+                       if(args.length != 4) {
+                               System.err.println("Usage 
SimpleCommunityDetection <edge path> <output path> " +
+                                               "<num iterations> <delta>");
+                               return false;
+                       }
+
+                       fileOutput = true;
+                       edgeInputPath = args[0];
+                       outputPath = args[1];
+                       maxIterations = Integer.parseInt(args[2]);
+                       delta = Double.parseDouble(args[3]);
+
+               } else {
+                       System.out.println("Executing SimpleCommunityDetection 
example with default parameters and built-in default data.");
+                       System.out.println("Provide parameters to read input 
data from files.");
+                       System.out.println("Usage SimpleCommunityDetection 
<edge path> <output path> " +
+                                       "<num iterations> <delta>");
+               }
+
+               return true;
+       }
+
+       @SuppressWarnings("serial")
+       private static DataSet<Edge<Long, Double>> 
getEdgesDataSet(ExecutionEnvironment env) {
+
+               if(fileOutput) {
+                       return env.readCsvFile(edgeInputPath)
+                                       .ignoreComments("#")
+                                       .fieldDelimiter("\t")
+                                       .lineDelimiter("\n")
+                                       .types(Long.class, Long.class, 
Double.class)
+                                       .map(new Tuple3ToEdgeMap<Long, 
Double>());
+               } else {
+                       return 
SimpleCommunityDetectionData.getDefaultEdgeDataSet(env);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
index ff523ce..22883a8 100644
--- 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
@@ -22,12 +22,12 @@ import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
 import org.apache.flink.graph.library.SingleSourceShortestPaths;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 
 /**
  * This example implements the Single Source Shortest Paths algorithm,
@@ -126,13 +126,7 @@ public class SingleSourceShortestPathsExample implements 
ProgramDescription {
                                        .lineDelimiter("\n")
                                        .fieldDelimiter("\t")
                                        .types(Long.class, Long.class, 
Double.class)
-                                       .map(new MapFunction<Tuple3<Long, Long, 
Double>, Edge<Long, Double>>() {
-
-                                               @Override
-                                               public Edge<Long, Double> 
map(Tuple3<Long, Long, Double> tuple3) throws Exception {
-                                                       return new Edge<Long, 
Double>(tuple3.f0, tuple3.f1, tuple3.f2);
-                                               }
-                                       });
+                                       .map(new Tuple3ToEdgeMap<Long, 
Double>());
                } else {
                        return 
SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java
new file mode 100644
index 0000000..20b562b
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data set used for the Simple Community Detection 
example program.
+ * If no parameters are given to the program, the default edge data set is 
used.
+ */
+public class SimpleCommunityDetectionData {
+
+       // the algorithm is not guaranteed to always converge
+       public static final Integer MAX_ITERATIONS = 30;
+
+       public static final double DELTA = 0.5f;
+
+       public static DataSet<Edge<Long, Double>> 
getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+               List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, 
Double>>();
+               edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+               edges.add(new Edge<Long, Double>(1L, 3L, 2.0));
+               edges.add(new Edge<Long, Double>(1L, 4L, 3.0));
+               edges.add(new Edge<Long, Double>(2L, 3L, 4.0));
+               edges.add(new Edge<Long, Double>(2L, 4L, 5.0));
+               edges.add(new Edge<Long, Double>(3L, 5L, 6.0));
+               edges.add(new Edge<Long, Double>(5L, 6L, 7.0));
+               edges.add(new Edge<Long, Double>(5L, 7L, 8.0));
+               edges.add(new Edge<Long, Double>(6L, 7L, 9.0));
+               edges.add(new Edge<Long, Double>(7L, 12L, 10.0));
+               edges.add(new Edge<Long, Double>(8L, 9L, 11.0));
+               edges.add(new Edge<Long, Double>(8L, 10L, 12.0));
+               edges.add(new Edge<Long, Double>(8L, 11L, 13.0));
+               edges.add(new Edge<Long, Double>(9L, 10L, 14.0));
+               edges.add(new Edge<Long, Double>(9L, 11L, 15.0));
+               edges.add(new Edge<Long, Double>(10L, 11L, 16.0));
+               edges.add(new Edge<Long, Double>(10L, 12L, 17.0));
+               edges.add(new Edge<Long, Double>(11L, 12L, 18.0));
+
+               return env.fromCollection(edges);
+       }
+
+       private SimpleCommunityDetectionData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
new file mode 100644
index 0000000..5d3afc7
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexCentricIteration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Simple Community Detection Algorithm.
+ *
+ * Initially, each vertex is assigned a tuple formed of its own id along with 
a score equal to 1.0, as value.
+ * The vertices propagate their labels and max scores in iterations, each time 
adopting the label with the
+ * highest score from the list of received messages. The chosen label is 
afterwards re-scored using the fraction
+ * delta/the superstep number. Delta is passed as a parameter and has 0.5 as a 
default value.
+ *
+ * The algorithm converges when vertices no longer update their value or when 
the maximum number of iterations
+ * is reached.
+ *
+ * @see <a href="http://arxiv.org/pdf/0808.2633.pdf";>article explaining the 
algorithm in detail</a>
+ *
+ *<p>
+ *     The input files is a plain text file and must be formatted as follows:
+ *     <br>
+ *     Edges are represented by tuples of srcVertexId, trgVertexId which are
+ *     separated by tabs. Edges themselves are separated by newlines.
+ *     For example: <code>1    2\n1    3\n</code> defines two edges 1-2 and 1-3
+ * </p>
+ *
+ * Usage <code>SimpleCommunityDetection &lt;edge path&gt; &lt;result path&gt;
+ * &lt;number of iterations&gt; &lt;delta&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.SimpleCommunityDetectionData}
+ */
+public class SimpleCommunityDetection implements GraphAlgorithm<Long, Long, 
Double> {
+
+       private Integer maxIterations;
+
+       private Double delta;
+
+       public SimpleCommunityDetection(Integer maxIterations, Double delta) {
+
+               this.maxIterations = maxIterations;
+               this.delta = delta;
+       }
+
+       @Override
+       public Graph<Long, Long, Double> run(Graph<Long, Long, Double> graph) {
+
+               Graph<Long, Long, Double> undirectedGraph = 
graph.getUndirected();
+
+               Graph<Long, Tuple2<Long, Double>, Double> 
graphWithScoredVertices = undirectedGraph
+                               .mapVertices(new 
AddScoreToVertexValuesMapper());
+
+               VertexCentricIteration<Long, Tuple2<Long, Double>, Tuple2<Long, 
Double>, Double>
+                               iteration = 
graphWithScoredVertices.createVertexCentricIteration(new 
VertexLabelUpdater(delta),
+                               new LabelMessenger(), maxIterations);
+
+               return 
graphWithScoredVertices.runVertexCentricIteration(iteration)
+                               .mapVertices(new 
RemoveScoreFromVertexValuesMapper());
+       }
+
+       public static final class VertexLabelUpdater extends 
VertexUpdateFunction<Long, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+
+               private Double delta;
+
+               public VertexLabelUpdater(Double delta) {
+                       this.delta = delta;
+               }
+
+               @Override
+               public void updateVertex(Long vertexKey, Tuple2<Long, Double> 
labelScore,
+                                                               
MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception {
+
+                       // we would like these two maps to be ordered
+                       Map<Long, Double> receivedLabelsWithScores = new 
TreeMap<Long, Double>();
+                       Map<Long, Double> labelsWithHighestScore = new 
TreeMap<Long, Double>();
+
+                       for (Tuple2<Long, Double> message : inMessages) {
+                               // split the message into received label and 
score
+                               Long receivedLabel = message.f0;
+                               Double receivedScore = message.f1;
+
+                               // if the label was received before
+                               if 
(receivedLabelsWithScores.containsKey(receivedLabel)) {
+                                       Double newScore = receivedScore + 
receivedLabelsWithScores.get(receivedLabel);
+                                       
receivedLabelsWithScores.put(receivedLabel, newScore);
+                               } else {
+                                       // first time we see the label
+                                       
receivedLabelsWithScores.put(receivedLabel, receivedScore);
+                               }
+
+                               // store the labels with the highest scores
+                               if 
(labelsWithHighestScore.containsKey(receivedLabel)) {
+                                       Double currentScore = 
labelsWithHighestScore.get(receivedLabel);
+                                       if (currentScore < receivedScore) {
+                                               // record the highest score
+                                               
labelsWithHighestScore.put(receivedLabel, receivedScore);
+                                       }
+                               } else {
+                                       // first time we see this label
+                                       
labelsWithHighestScore.put(receivedLabel, receivedScore);
+                               }
+                       }
+
+                       if(receivedLabelsWithScores.size() > 0) {
+                               // find the label with the highest score from 
the ones received
+                               Double maxScore = -Double.MAX_VALUE;
+                               Long maxScoreLabel = labelScore.f0;
+                               for (Long curLabel : 
receivedLabelsWithScores.keySet()) {
+
+                                       if 
(receivedLabelsWithScores.get(curLabel) > maxScore) {
+                                               maxScore = 
receivedLabelsWithScores.get(curLabel);
+                                               maxScoreLabel = curLabel;
+                                       }
+                               }
+
+                               // find the highest score of maxScoreLabel
+                               Double highestScore = 
labelsWithHighestScore.get(maxScoreLabel);
+                               // re-score the new label
+                               if (maxScoreLabel != labelScore.f0) {
+                                       highestScore -= delta / 
getSuperstepNumber();
+                               }
+                               // else delta = 0
+                               // update own label
+                               setNewVertexValue(new Tuple2<Long, 
Double>(maxScoreLabel, highestScore));
+                       }
+               }
+       }
+
+       public static final class LabelMessenger extends 
MessagingFunction<Long, Tuple2<Long, Double>,
+                       Tuple2<Long, Double>, Double> {
+
+               @Override
+               public void sendMessages(Long vertexKey, Tuple2<Long, Double> 
vertexValue) throws Exception {
+
+                       for(Edge<Long, Double> edge : getOutgoingEdges()) {
+                               sendMessageTo(edge.getTarget(), new 
Tuple2<Long, Double>(vertexValue.f0, vertexValue.f1 * edge.getValue()));
+                       }
+
+               }
+       }
+
+       @SuppressWarnings("serial")
+       public static final class AddScoreToVertexValuesMapper implements 
MapFunction<Vertex<Long, Long>, Tuple2<Long, Double>> {
+
+               @Override
+               public Tuple2<Long, Double> map(Vertex<Long, Long> vertex) 
throws Exception {
+                       return new Tuple2<Long, Double>(vertex.getValue(), 1.0);
+               }
+       }
+
+       @SuppressWarnings("serial")
+       public static final class RemoveScoreFromVertexValuesMapper implements 
MapFunction<Vertex<Long, Tuple2<Long, Double>>, Long> {
+
+               @Override
+               public Long map(Vertex<Long, Tuple2<Long, Double>> vertex) 
throws Exception {
+                       return vertex.getValue().f0;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java
 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java
new file mode 100644
index 0000000..def5006
--- /dev/null
+++ 
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.SimpleCommunityDetectionExample;
+import org.apache.flink.graph.example.utils.SimpleCommunityDetectionData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class SimpleCommunityDetectionITCase extends MultipleProgramsTestBase {
+
+       private String edgesPath;
+
+       private String resultPath;
+
+       private String expected;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       public SimpleCommunityDetectionITCase(TestExecutionMode mode) {
+               super(mode);
+       }
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+       }
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testSingleIteration() throws Exception {
+               /*
+                * Test one iteration of the Simple Community Detection Example
+                */
+               final String edges = "1 2       1.0\n" + "1     3       2.0\n" 
+ "1     4       3.0\n" + "1     5       4.0\n" + "2     6       5.0\n" +
+                               "6      7       6.0\n" + "6     8       7.0\n" 
+ "7     8       8.0";
+               edgesPath = createTempFile(edges);
+
+               SimpleCommunityDetectionExample.main(new String[]{edgesPath, 
resultPath, "1",
+                               SimpleCommunityDetectionData.DELTA + ""});
+
+               expected = "1,5\n" + "2,6\n" + "3,1\n" + "4,1\n" + "5,1\n" + 
"6,8\n" + "7,8\n" + "8,7";
+       }
+
+       @Test
+       public void testTieBreaker() throws Exception {
+               /*
+                * Test one iteration of the Simple Community Detection Example 
where a tie must be broken
+                */
+
+               final String edges = "1 2       1.0\n" + "1     3       1.0\n" 
+ "1     4       1.0\n" + "1     5       1.0";
+               edgesPath = createTempFile(edges);
+
+               SimpleCommunityDetectionExample.main(new String[] {edgesPath, 
resultPath, "1",
+                               SimpleCommunityDetectionData.DELTA + ""});
+
+               expected = "1,2\n" + "2,1\n" + "3,1\n" + "4,1\n" + "5,1";
+       }
+
+
+       // 
-------------------------------------------------------------------------
+       // Util methods
+       // 
-------------------------------------------------------------------------
+       private String createTempFile(final String rows) throws Exception {
+               File tempFile = tempFolder.newFile();
+               Files.write(rows, tempFile, Charsets.UTF_8);
+               return tempFile.toURI().toString();
+       }
+}

Reply via email to