Author: ssc
Date: Wed Jun 22 07:58:47 2011
New Revision: 1138329
URL: http://svn.apache.org/viewvc?rev=1138329&view=rev
Log:
MAHOUT-739 MapReduce job to compute the degree distribution of an undirected
graph
Added:
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/SumReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java
Added:
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/SumReducer.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/SumReducer.java?rev=1138329&view=auto
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/SumReducer.java
(added)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/SumReducer.java
Wed Jun 22 07:58:47 2011
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.mapreduce;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+
+public class SumReducer extends
Reducer<WritableComparable<?>,IntWritable,WritableComparable<?>,IntWritable> {
+ @Override
+ protected void reduce(WritableComparable<?> key, Iterable<IntWritable>
counts, Context ctx)
+ throws IOException, InterruptedException {
+ int sum = 0;
+ for (IntWritable count : counts) {
+ sum += count.get();
+ }
+ ctx.write(key, new IntWritable(sum));
+ }
+ }
Added:
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java?rev=1138329&view=auto
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
(added)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
Wed Jun 22 07:58:47 2011
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.common;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.mapreduce.SumReducer;
+import org.apache.mahout.graph.model.UndirectedEdge;
+import org.apache.mahout.graph.model.Vertex;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * <p>Distributed computation of the distribution of degrees of an undirected
graph</p>
+ *
+ * <p>The input file needs to be a {@link org.apache.hadoop.io.SequenceFile}
with {@link UndirectedEdge}s as keys and
+ * any Writable as values, as it is already produced by {@link
SimplifyGraphJob}</p>
+ *
+ * <p>This job outputs text files with a degree and the number of nodes having
that degree per line.</p>
+ *
+ *
+ * <p>Command line arguments specific to this class are:</p>
+ *
+ * <ol>
+ * <li>-Dmapred.input.dir=(path): Directory containing one or more sequence
files with edge data</li>
+ * <li>-Dmapred.output.dir=(path): output path where the degree distribution
data should be written</li>
+ * </ol>
+ *
+ * <p>General command line options are documented in {@link AbstractJob}.</p>
+ *
+ * <p>Note that because of how Hadoop parses arguments, all "-D" arguments
must appear before all other arguments.</p>
+ */
+public class DegreeDistributionJob extends AbstractJob {
+
+ private static final IntWritable ONE = new IntWritable(1);
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new DegreeDistributionJob(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ addInputOption();
+ addOutputOption();
+
+ Map<String, String> parsedArgs = parseArguments(args);
+ if (parsedArgs == null) {
+ return -1;
+ }
+
+ Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
+ Path degreesPerVertexPath = new Path(tempDirPath, "degreesPerVertex");
+
+ Job degreesPerVertex = prepareJob(getInputPath(), degreesPerVertexPath,
SequenceFileInputFormat.class,
+ DegreeOfVertexMapper.class, Vertex.class, IntWritable.class,
SumReducer.class, Vertex.class, IntWritable.class,
+ SequenceFileOutputFormat.class);
+ degreesPerVertex.setCombinerClass(SumReducer.class);
+ degreesPerVertex.waitForCompletion(true);
+
+ Job degreeDistribution = prepareJob(degreesPerVertexPath, getOutputPath(),
SequenceFileInputFormat.class,
+ DegreesMapper.class, IntWritable.class, IntWritable.class,
SumReducer.class, IntWritable.class,
+ IntWritable.class, TextOutputFormat.class);
+ degreeDistribution.setCombinerClass(SumReducer.class);
+ degreeDistribution.waitForCompletion(true);
+
+ return 0;
+ }
+
+ public static class DegreeOfVertexMapper extends
Mapper<UndirectedEdge,Writable,Vertex,IntWritable> {
+ @Override
+ protected void map(UndirectedEdge edge, Writable value, Context ctx)
throws IOException, InterruptedException {
+ ctx.write(edge.getFirstVertex(), ONE);
+ ctx.write(edge.getSecondVertex(), ONE);
+ }
+ }
+
+ public static class DegreesMapper extends
Mapper<Vertex,IntWritable,IntWritable,IntWritable> {
+ @Override
+ protected void map(Vertex vertex, IntWritable degree, Context ctx) throws
IOException, InterruptedException {
+ ctx.write(degree, ONE);
+ }
+ }
+}
Added:
mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java?rev=1138329&view=auto
==============================================================================
---
mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java
(added)
+++
mahout/trunk/core/src/test/java/org/apache/mahout/graph/common/DegreeDistributionJobTest.java
Wed Jun 22 07:58:47 2011
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.graph.common;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.mahout.common.MahoutTestCase;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.graph.model.UndirectedEdge;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Map;
+
+public class DegreeDistributionJobTest extends MahoutTestCase {
+
+@Test
+ public void toyIntegrationTest() throws Exception {
+
+ File inputFile = getTestTempFile("edges.seq");
+ File outputDir = getTestTempDir("output");
+ outputDir.delete();
+ File tempDir = getTestTempDir("tmp");
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, new
Path(inputFile.getAbsolutePath().toString()),
+ UndirectedEdge.class, NullWritable.class);
+
+ try {
+ writer.append(new UndirectedEdge(0, 1), NullWritable.get());
+ writer.append(new UndirectedEdge(0, 2), NullWritable.get());
+ writer.append(new UndirectedEdge(0, 3), NullWritable.get());
+ writer.append(new UndirectedEdge(0, 4), NullWritable.get());
+ writer.append(new UndirectedEdge(0, 5), NullWritable.get());
+ writer.append(new UndirectedEdge(0, 6), NullWritable.get());
+ writer.append(new UndirectedEdge(0, 7), NullWritable.get());
+ writer.append(new UndirectedEdge(1, 2), NullWritable.get());
+ writer.append(new UndirectedEdge(1, 3), NullWritable.get());
+ writer.append(new UndirectedEdge(2, 3), NullWritable.get());
+ writer.append(new UndirectedEdge(4, 5), NullWritable.get());
+ writer.append(new UndirectedEdge(4, 7), NullWritable.get());
+ } finally {
+ Closeables.closeQuietly(writer);
+ }
+
+ DegreeDistributionJob degreeDistributionJob = new DegreeDistributionJob();
+ degreeDistributionJob.setConf(conf);
+ degreeDistributionJob.run(new String[] { "--input",
inputFile.getAbsolutePath(),
+ "--output", outputDir.getAbsolutePath(), "--tempDir",
tempDir.getAbsolutePath() });
+
+ Map<Integer,Integer> degreeDistribution = Maps.newHashMap();
+ for (String line : new FileLineIterable(new File(outputDir,
"part-r-00000"))) {
+ String[] tokens = Iterables.toArray(Splitter.on("\t").split(line),
String.class);
+ degreeDistribution.put(Integer.parseInt(tokens[0]),
Integer.parseInt(tokens[1]));
+ }
+
+ assertEquals(4, degreeDistribution.size());
+ assertEquals(1, degreeDistribution.get(1).intValue());
+ assertEquals(2, degreeDistribution.get(2).intValue());
+ assertEquals(4, degreeDistribution.get(3).intValue());
+ assertEquals(1, degreeDistribution.get(7).intValue());
+ }
+
+}