Author: ssc
Date: Thu Jun 23 12:26:46 2011
New Revision: 1138850
URL: http://svn.apache.org/viewvc?rev=1138850&view=rev
Log:
MAHOUT-741 MapReduce job to compute the local clustering coefficient in an
undirected graph
Added:
mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJobTest.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Triangle.java
mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Triangle.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Triangle.java?rev=1138850&r1=1138849&r2=1138850&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Triangle.java
(original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/graph/model/Triangle.java
Thu Jun 23 12:26:46 2011
@@ -46,6 +46,18 @@ public class Triangle implements Writabl
this(new Vertex(firstVertexId), new Vertex(secondVertexId), new
Vertex(thirdVertexId));
}
+ public Vertex getFirstVertex() {
+ return first;
+ }
+
+ public Vertex getSecondVertex() {
+ return second;
+ }
+
+ public Vertex getThirdVertex() {
+ return third;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
Modified:
mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java?rev=1138850&r1=1138849&r2=1138850&view=diff
==============================================================================
---
mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java
(original)
+++
mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java
Thu Jun 23 12:26:46 2011
@@ -36,7 +36,7 @@ import java.util.Map;
public class DegreeDistributionJobTest extends MahoutTestCase {
-@Test
+ @Test
public void toyIntegrationTest() throws Exception {
File inputFile = getTestTempFile("edges.seq");
@@ -47,7 +47,7 @@ public class DegreeDistributionJobTest e
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
- SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new
Path(inputFile.getAbsolutePath().toString()),
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new
Path(inputFile.getAbsolutePath()),
UndirectedEdge.class, NullWritable.class);
try {
Added:
mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java?rev=1138850&view=auto
==============================================================================
---
mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
(added)
+++
mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
Thu Jun 23 12:26:46 2011
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.common;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.graph.model.Triangle;
+import org.apache.mahout.graph.model.UndirectedEdge;
+import org.apache.mahout.graph.model.Vertex;
+import org.apache.mahout.math.Varint;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * <p>Distributed computation of the local clustering coefficient of the
vertices of an undirected graph. The local clustering coefficient is a
+ * measure for the "connectedness" of a vertex in its neighborhood and is
computed by dividing the number of closed triangles with a vertex'
+ * neighbors by the number of possible triangles of this vertex with it's
neighbours.</p>
+ *
+ * <p>The input files needs to be {@link org.apache.hadoop.io.SequenceFile}s,
one with {@link UndirectedEdge}s as keys and
+ * any Writable as values, as it is already produced by {@link
SimplifyGraphJob}, the other with {@link Triangle}s as keys and any Writable as
+ * values, as it is already produced by {@link
org.apache.mahout.graph.triangles.EnumerateTrianglesJob}</p>
+ *
+ * <p>This job outputs text files with a vertex id and it local clustering
coefficient per line.</p>
+ *
+ * <p>Command line arguments specific to this class are:</p>
+ *
+ * <ol>
+ * <li>--edges=(path): Directory containing one or more sequence files with
edge data</li>
+ * <li>--triangles=(path): Directory containing one or more sequence files
with triangle data</li>
+ * <li>--Dmapred.output.dir=(path): output path where the degree distribution
data should be written</li>
+ * </ol>
+ *
+ * <p>General command line options are documented in {@link AbstractJob}.</p>
+ *
+ * <p>Note that because of how Hadoop parses arguments, all "-D" arguments
must appear before all other arguments.</p>
+ */
+public class LocalClusteringCoefficientJob extends AbstractJob {
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new LocalClusteringCoefficientJob(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ addOption("edges", "e", "path to the edges of the input graph", true);
+ addOption("triangles", "t", "path to the triangles of the input graph",
true);
+ addOutputOption();
+
+ Map<String, String> parsedArgs = parseArguments(args);
+ if (parsedArgs == null) {
+ return -1;
+ }
+
+ Path edgesPath = new Path(parsedArgs.get("--edges"));
+ Path trianglesPath = new Path(parsedArgs.get("--triangles"));
+
+ Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
+
+ Path edgesPerVertex = new Path(tempDirPath, "edgesPerVertex");
+ Path trianglesPerVertex = new Path(tempDirPath, "trianglesPerVertex");
+
+ // unfortunately we don't have access to an undeprecated MultipleInputs,
so we need several M/R steps instead of one...
+ Job countEdgesPerVertex = prepareJob(edgesPath, edgesPerVertex,
SequenceFileInputFormat.class,
+ EdgeCountMapper.class, Vertex.class, TriangleOrEdgeCount.class,
Reducer.class, Vertex.class,
+ TriangleOrEdgeCount.class, SequenceFileOutputFormat.class);
+ countEdgesPerVertex.setCombinerClass(TriangleOrEdgeCountCombiner.class);
+ countEdgesPerVertex.waitForCompletion(true);
+
+ Job countTrianglesPerVertex = prepareJob(trianglesPath,
trianglesPerVertex, SequenceFileInputFormat.class,
+ TriangleCountMapper.class, Vertex.class, TriangleOrEdgeCount.class,
Reducer.class, Vertex.class,
+ TriangleOrEdgeCount.class, SequenceFileOutputFormat.class);
+
countTrianglesPerVertex.setCombinerClass(TriangleOrEdgeCountCombiner.class);
+ countTrianglesPerVertex.waitForCompletion(true);
+
+ Job computeLocalClusteringCoefficient = prepareJob(new Path(edgesPerVertex
+ "," + trianglesPerVertex),
+ getOutputPath(), SequenceFileInputFormat.class, Mapper.class,
Vertex.class, TriangleOrEdgeCount.class,
+ LocalClusteringCoefficientReducer.class, LongWritable.class,
DoubleWritable.class, TextOutputFormat.class);
+
computeLocalClusteringCoefficient.setCombinerClass(TriangleOrEdgeCountCombiner.class);
+ computeLocalClusteringCoefficient.waitForCompletion(true);
+
+ return 0;
+ }
+
+ static class EdgeCountMapper extends
Mapper<UndirectedEdge,Writable,Vertex,TriangleOrEdgeCount> {
+
+ private static final TriangleOrEdgeCount ONE_EDGE = new
TriangleOrEdgeCount(1, false);
+
+ @Override
+ protected void map(UndirectedEdge edge, Writable value, Context ctx)
throws IOException, InterruptedException {
+ ctx.write(edge.getFirstVertex(), ONE_EDGE);
+ ctx.write(edge.getSecondVertex(), ONE_EDGE);
+ }
+ }
+
+ static class TriangleCountMapper extends
Mapper<Triangle,Writable,Vertex,TriangleOrEdgeCount> {
+
+ private static final TriangleOrEdgeCount ONE_TRIANGLE = new
TriangleOrEdgeCount(1, true);
+
+ @Override
+ protected void map(Triangle triangle, Writable value, Context ctx) throws
IOException, InterruptedException {
+ ctx.write(triangle.getFirstVertex(), ONE_TRIANGLE);
+ ctx.write(triangle.getSecondVertex(), ONE_TRIANGLE);
+ ctx.write(triangle.getThirdVertex(), ONE_TRIANGLE);
+ }
+ }
+
+ static class LocalClusteringCoefficientReducer
+ extends Reducer<Vertex,TriangleOrEdgeCount,LongWritable,DoubleWritable> {
+ @Override
+ protected void reduce(Vertex vertex, Iterable<TriangleOrEdgeCount> counts,
Context ctx)
+ throws IOException, InterruptedException {
+ int numEdges = 0;
+ int numTriangles = 0;
+
+ for (TriangleOrEdgeCount count : counts) {
+ if (count.isTriangles()) {
+ numTriangles += count.get();
+ } else {
+ numEdges += count.get();
+ }
+ }
+
+ double localClusteringCoefficient = numEdges > 1 ?
+ (double) numTriangles / (double) (numEdges * (numEdges - 1)) : 0d;
+
+ ctx.write(new LongWritable(vertex.getId()), new
DoubleWritable(localClusteringCoefficient));
+ }
+ }
+
+ static class TriangleOrEdgeCountCombiner extends
Reducer<Vertex,TriangleOrEdgeCount,Vertex,TriangleOrEdgeCount> {
+
+ @Override
+ protected void reduce(Vertex vertex, Iterable<TriangleOrEdgeCount> counts,
Context ctx)
+ throws IOException, InterruptedException {
+ int numEdges = 0;
+ int numTriangles = 0;
+
+ for (TriangleOrEdgeCount count : counts) {
+ if (count.isTriangles()) {
+ numTriangles += count.get();
+ } else {
+ numEdges += count.get();
+ }
+ }
+
+ if (numEdges > 0) {
+ ctx.write(vertex, new TriangleOrEdgeCount(numEdges, false));
+ }
+ if (numTriangles > 0) {
+ ctx.write(vertex, new TriangleOrEdgeCount(numTriangles, true));
+ }
+ }
+ }
+
+
+ static class TriangleOrEdgeCount implements Writable {
+
+ private int count;
+ private boolean isTriangles;
+
+ TriangleOrEdgeCount() {}
+
+ public int get() {
+ return count;
+ }
+
+ public boolean isTriangles() {
+ return isTriangles;
+ }
+
+ TriangleOrEdgeCount(int count, boolean isTriangle) {
+ this.count = count;
+ this.isTriangles = isTriangle;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Varint.writeUnsignedVarInt(count, out);
+ out.writeBoolean(isTriangles);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ count = Varint.readUnsignedVarInt(in);
+ isTriangles = in.readBoolean();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof TriangleOrEdgeCount) {
+ TriangleOrEdgeCount other = (TriangleOrEdgeCount) o;
+ return count == other.count && isTriangles == other.isTriangles;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * count + (isTriangles ? 1 : 0);
+ }
+ }
+}
Added:
mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJobTest.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJobTest.java?rev=1138850&view=auto
==============================================================================
---
mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJobTest.java
(added)
+++
mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJobTest.java
Thu Jun 23 12:26:46 2011
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.common;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.graph.model.Triangle;
+import org.apache.mahout.graph.model.UndirectedEdge;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Map;
+
+public class LocalClusteringCoefficientJobTest extends MahoutTestCase {
+
+ @Test
+ public void toyIntegrationTest() throws Exception {
+
+ File edgesFile = getTestTempFile("edges.seq");
+ File trianglesFile = getTestTempFile("triangles.seq");
+ File outputDir = getTestTempDir("output");
+ outputDir.delete();
+ File tempDir = getTestTempDir("tmp");
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+
+ SequenceFile.Writer edgesWriter = new SequenceFile.Writer(fs, conf, new
Path(edgesFile.getAbsolutePath()),
+ UndirectedEdge.class, NullWritable.class);
+ try {
+ edgesWriter.append(new UndirectedEdge(0, 1), NullWritable.get());
+ edgesWriter.append(new UndirectedEdge(0, 2), NullWritable.get());
+ edgesWriter.append(new UndirectedEdge(0, 3), NullWritable.get());
+ edgesWriter.append(new UndirectedEdge(0, 4), NullWritable.get());
+ edgesWriter.append(new UndirectedEdge(0, 5), NullWritable.get());
+ edgesWriter.append(new UndirectedEdge(0, 6), NullWritable.get());
+ edgesWriter.append(new UndirectedEdge(0, 7), NullWritable.get());
+ edgesWriter.append(new UndirectedEdge(1, 2), NullWritable.get());
+ edgesWriter.append(new UndirectedEdge(1, 3), NullWritable.get());
+ edgesWriter.append(new UndirectedEdge(2, 3), NullWritable.get());
+ edgesWriter.append(new UndirectedEdge(4, 5), NullWritable.get());
+ edgesWriter.append(new UndirectedEdge(4, 7), NullWritable.get());
+ } finally {
+ Closeables.closeQuietly(edgesWriter);
+ }
+
+ SequenceFile.Writer trianglesWriter = new SequenceFile.Writer(fs, conf,
new Path(trianglesFile.getAbsolutePath()),
+ Triangle.class, NullWritable.class);
+ try {
+ trianglesWriter.append(new Triangle(0, 1, 2), NullWritable.get());
+ trianglesWriter.append(new Triangle(0, 1, 3), NullWritable.get());
+ trianglesWriter.append(new Triangle(0, 2, 3), NullWritable.get());
+ trianglesWriter.append(new Triangle(0, 4, 5), NullWritable.get());
+ trianglesWriter.append(new Triangle(0, 4, 7), NullWritable.get());
+ trianglesWriter.append(new Triangle(1, 2, 3), NullWritable.get());
+ } finally {
+ Closeables.closeQuietly(trianglesWriter);
+ }
+
+ LocalClusteringCoefficientJob clusteringCoefficientJob = new
LocalClusteringCoefficientJob();
+ clusteringCoefficientJob.setConf(conf);
+ clusteringCoefficientJob.run(new String[] { "--edges",
edgesFile.getAbsolutePath(),
+ "--triangles", trianglesFile.getAbsolutePath(), "--output",
outputDir.getAbsolutePath(),
+ "--tempDir", tempDir.getAbsolutePath() });
+
+ Map<Long,Double> localClusteringCoefficients = Maps.newHashMap();
+ for (String line : new FileLineIterable(new File(outputDir,
"part-r-00000"))) {
+ String[] tokens = Iterables.toArray(Splitter.on("\t").split(line),
String.class);
+ localClusteringCoefficients.put(Long.parseLong(tokens[0]),
Double.parseDouble(tokens[1]));
+ }
+
+ assertEquals(8, localClusteringCoefficients.size());
+ assertEquals(0.119047, localClusteringCoefficients.get(0L), EPSILON);
+ assertEquals(0.5, localClusteringCoefficients.get(1L), EPSILON);
+ assertEquals(0.5, localClusteringCoefficients.get(2L), EPSILON);
+ assertEquals(0.5, localClusteringCoefficients.get(3L), EPSILON);
+ assertEquals(0.333333, localClusteringCoefficients.get(4L), EPSILON);
+ assertEquals(0.5, localClusteringCoefficients.get(5L), EPSILON);
+ assertEquals(0, localClusteringCoefficients.get(6L), EPSILON);
+ assertEquals(0.5, localClusteringCoefficients.get(7L), EPSILON);
+ }
+}