Author: ssc
Date: Mon Jul 4 13:59:13 2011
New Revision: 1142668
URL: http://svn.apache.org/viewvc?rev=1142668&view=rev
Log:
small cleanup of tmp directory management in the graph jobs
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=1142668&r1=1142667&r2=1142668&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
(original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
Mon Jul 4 13:59:13 2011
@@ -131,6 +131,10 @@ public abstract class AbstractJob extend
return new Path(tempPath, directory);
}
+ protected Path getCombinedTempPath(String directory1, String directory2) {
+ return new Path(new Path(tempPath, directory1) + "," + new Path(tempPath,
directory2));
+ }
+
/** Add an option with no argument whose presence can be checked for using
* {@code containsKey} method on the map returned by {@link
#parseArguments(String[])};
*/
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java?rev=1142668&r1=1142667&r2=1142668&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java
Mon Jul 4 13:59:13 2011
@@ -43,6 +43,8 @@ import org.apache.mahout.graph.model.Ver
*/
public class AugmentGraphWithDegreesJob extends AbstractJob {
+ public static final String TMP_AUGMENTED_EDGES = "augmented-edges";
+
public static void main(String[] args) throws Exception {
ToolRunner.run(new AugmentGraphWithDegreesJob(), args);
}
@@ -58,22 +60,19 @@ public class AugmentGraphWithDegreesJob
return -1;
}
- Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
-
Path inputPath = getInputPath();
- Path augmentedEdgesPath = new Path(tempDirPath, "augmented-edges");
Path outputPath = getOutputPath();
// scatter the edges to each of the vertices and count degree
- Job scatter = prepareJob(inputPath, augmentedEdgesPath,
SequenceFileInputFormat.class, ScatterEdgesMapper.class,
- Vertex.class, Vertex.class, SumDegreesReducer.class,
UndirectedEdge.class, VertexWithDegree.class,
- SequenceFileOutputFormat.class);
+ Job scatter = prepareJob(inputPath, getTempPath(TMP_AUGMENTED_EDGES),
SequenceFileInputFormat.class,
+ ScatterEdgesMapper.class, Vertex.class, Vertex.class,
SumDegreesReducer.class, UndirectedEdge.class,
+ VertexWithDegree.class, SequenceFileOutputFormat.class);
scatter.waitForCompletion(true);
// join augmented edges with partial degree information to to complete
records
- Job join = prepareJob(augmentedEdgesPath, outputPath,
SequenceFileInputFormat.class, Mapper.class,
- UndirectedEdge.class, VertexWithDegree.class,
JoinDegreesReducer.class, UndirectedEdgeWithDegrees.class,
- NullWritable.class, SequenceFileOutputFormat.class);
+ Job join = prepareJob(getTempPath(TMP_AUGMENTED_EDGES), outputPath,
SequenceFileInputFormat.class,
+ Mapper.class, UndirectedEdge.class, VertexWithDegree.class,
JoinDegreesReducer.class,
+ UndirectedEdgeWithDegrees.class, NullWritable.class,
SequenceFileOutputFormat.class);
join.waitForCompletion(true);
return 0;
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java?rev=1142668&r1=1142667&r2=1142668&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
Mon Jul 4 13:59:13 2011
@@ -17,7 +17,6 @@
package org.apache.mahout.graph.common;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
@@ -56,6 +55,8 @@ import java.util.Map;
*/
public class DegreeDistributionJob extends AbstractJob {
+ public static final String TMP_DEGREES_PER_VERTEX = "degreesPerVertex";
+
private static final IntWritable ONE = new IntWritable(1);
public static void main(String[] args) throws Exception {
@@ -72,18 +73,15 @@ public class DegreeDistributionJob exten
return -1;
}
- Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
- Path degreesPerVertexPath = new Path(tempDirPath, "degreesPerVertex");
-
- Job degreesPerVertex = prepareJob(getInputPath(), degreesPerVertexPath,
SequenceFileInputFormat.class,
- DegreeOfVertexMapper.class, Vertex.class, IntWritable.class,
IntSumReducer.class, Vertex.class,
- IntWritable.class, SequenceFileOutputFormat.class);
+ Job degreesPerVertex = prepareJob(getInputPath(),
getTempPath(TMP_DEGREES_PER_VERTEX),
+ SequenceFileInputFormat.class, DegreeOfVertexMapper.class,
Vertex.class, IntWritable.class, IntSumReducer.class,
+ Vertex.class, IntWritable.class, SequenceFileOutputFormat.class);
degreesPerVertex.setCombinerClass(IntSumReducer.class);
degreesPerVertex.waitForCompletion(true);
- Job degreeDistribution = prepareJob(degreesPerVertexPath, getOutputPath(),
SequenceFileInputFormat.class,
- DegreesMapper.class, IntWritable.class, IntWritable.class,
IntSumReducer.class, IntWritable.class,
- IntWritable.class, TextOutputFormat.class);
+ Job degreeDistribution = prepareJob(getTempPath(TMP_DEGREES_PER_VERTEX),
getOutputPath(),
+ SequenceFileInputFormat.class, DegreesMapper.class, IntWritable.class,
IntWritable.class, IntSumReducer.class,
+ IntWritable.class, IntWritable.class, TextOutputFormat.class);
degreeDistribution.setCombinerClass(IntSumReducer.class);
degreeDistribution.waitForCompletion(true);
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java?rev=1142668&r1=1142667&r2=1142668&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
Mon Jul 4 13:59:13 2011
@@ -64,6 +64,9 @@ import java.util.Map;
*/
public class LocalClusteringCoefficientJob extends AbstractJob {
+ public static final String TMP_EDGES_PER_VERTEX = "edgesPerVertex";
+ public static final String TMP_TRIANGLES_PER_VERTEX = "trianglesPerVertex";
+
public static void main(String[] args) throws Exception {
ToolRunner.run(new LocalClusteringCoefficientJob(), args);
}
@@ -83,27 +86,23 @@ public class LocalClusteringCoefficientJ
Path edgesPath = new Path(parsedArgs.get("--edges"));
Path trianglesPath = new Path(parsedArgs.get("--triangles"));
- Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
-
- Path edgesPerVertex = new Path(tempDirPath, "edgesPerVertex");
- Path trianglesPerVertex = new Path(tempDirPath, "trianglesPerVertex");
-
// unfortunately we don't have access to an undeprecated MultipleInputs,
so we need several M/R steps instead of one...
- Job countEdgesPerVertex = prepareJob(edgesPath, edgesPerVertex,
SequenceFileInputFormat.class,
- EdgeCountMapper.class, Vertex.class, TriangleOrEdgeCount.class,
Reducer.class, Vertex.class,
- TriangleOrEdgeCount.class, SequenceFileOutputFormat.class);
+ Job countEdgesPerVertex = prepareJob(edgesPath,
getTempPath(TMP_EDGES_PER_VERTEX),
+ SequenceFileInputFormat.class, EdgeCountMapper.class, Vertex.class,
TriangleOrEdgeCount.class, Reducer.class,
+ Vertex.class, TriangleOrEdgeCount.class,
SequenceFileOutputFormat.class);
countEdgesPerVertex.setCombinerClass(TriangleOrEdgeCountCombiner.class);
countEdgesPerVertex.waitForCompletion(true);
- Job countTrianglesPerVertex = prepareJob(trianglesPath,
trianglesPerVertex, SequenceFileInputFormat.class,
- TriangleCountMapper.class, Vertex.class, TriangleOrEdgeCount.class,
Reducer.class, Vertex.class,
- TriangleOrEdgeCount.class, SequenceFileOutputFormat.class);
+ Job countTrianglesPerVertex = prepareJob(trianglesPath,
getTempPath(TMP_TRIANGLES_PER_VERTEX),
+ SequenceFileInputFormat.class, TriangleCountMapper.class,
Vertex.class, TriangleOrEdgeCount.class,
+ Reducer.class, Vertex.class, TriangleOrEdgeCount.class,
SequenceFileOutputFormat.class);
countTrianglesPerVertex.setCombinerClass(TriangleOrEdgeCountCombiner.class);
countTrianglesPerVertex.waitForCompletion(true);
- Job computeLocalClusteringCoefficient = prepareJob(new Path(edgesPerVertex
+ "," + trianglesPerVertex),
- getOutputPath(), SequenceFileInputFormat.class, Mapper.class,
Vertex.class, TriangleOrEdgeCount.class,
- LocalClusteringCoefficientReducer.class, LongWritable.class,
DoubleWritable.class, TextOutputFormat.class);
+ Job computeLocalClusteringCoefficient =
prepareJob(getCombinedTempPath(TMP_EDGES_PER_VERTEX,
+ TMP_TRIANGLES_PER_VERTEX), getOutputPath(),
SequenceFileInputFormat.class, Mapper.class,
+ Vertex.class, TriangleOrEdgeCount.class,
LocalClusteringCoefficientReducer.class, LongWritable.class,
+ DoubleWritable.class, TextOutputFormat.class);
computeLocalClusteringCoefficient.setCombinerClass(TriangleOrEdgeCountCombiner.class);
computeLocalClusteringCoefficient.waitForCompletion(true);
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java?rev=1142668&r1=1142667&r2=1142668&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java
Mon Jul 4 13:59:13 2011
@@ -20,7 +20,6 @@ package org.apache.mahout.graph.common;
import java.io.IOException;
import java.util.regex.Pattern;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
@@ -64,10 +63,7 @@ public class SimplifyGraphJob extends Ab
return -1;
}
- Path inputPath = getInputPath();
- Path outputPath = getOutputPath();
-
- Job simplify = prepareJob(inputPath, outputPath, TextInputFormat.class,
SimplifyGraphMapper.class,
+ Job simplify = prepareJob(getInputPath(), getOutputPath(),
TextInputFormat.class, SimplifyGraphMapper.class,
UndirectedEdge.class, NullWritable.class, SimplifyGraphReducer.class,
UndirectedEdge.class, NullWritable.class,
SequenceFileOutputFormat.class);
simplify.waitForCompletion(true);
@@ -79,6 +75,7 @@ public class SimplifyGraphJob extends Ab
public static class SimplifyGraphMapper extends Mapper<Object, Text,
UndirectedEdge, NullWritable> {
private static final Pattern SEPARATOR = Pattern.compile(",");
+
@Override
public void map(Object key, Text line, Context ctx) throws IOException,
InterruptedException {
try {
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java?rev=1142668&r1=1142667&r2=1142668&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java
Mon Jul 4 13:59:13 2011
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
@@ -41,6 +40,9 @@ import org.apache.mahout.graph.model.Ver
/** Enumerates all triangles of an undirected graph. */
public class EnumerateTrianglesJob extends AbstractJob {
+ public static final String TMP_CLOSING_EDGES = "closingEdges";
+ public static final String TMP_OPEN_TRIADS = "openTriads";
+
public static void main(String[] args) throws Exception {
ToolRunner.run(new EnumerateTrianglesJob(), args);
}
@@ -55,29 +57,22 @@ public class EnumerateTrianglesJob exten
return -1;
}
- Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
-
- Path inputPath = getInputPath();
- Path joinableInputPath = new Path(tempDirPath, "joinableInput");
- Path triadsPath = new Path(tempDirPath, "triangles");
- Path outputPath = getOutputPath();
-
// scatter the edges to lower degree vertex and build open triads
- Job scatter = prepareJob(inputPath, triadsPath,
SequenceFileInputFormat.class,
+ Job scatter = prepareJob(getInputPath(), getTempPath(TMP_OPEN_TRIADS),
SequenceFileInputFormat.class,
ScatterEdgesToLowerDegreeVertexMapper.class, Vertex.class,
Vertex.class,
BuildOpenTriadsReducer.class, JoinableUndirectedEdge.class,
VertexOrMarker.class,
SequenceFileOutputFormat.class);
scatter.waitForCompletion(true);
// necessary as long as we don't have access to an undeprecated
MultipleInputs
- Job prepareInput = prepareJob(inputPath, joinableInputPath,
SequenceFileInputFormat.class, PrepareInputMapper.class,
- JoinableUndirectedEdge.class, VertexOrMarker.class, Reducer.class,
JoinableUndirectedEdge.class,
- VertexOrMarker.class, SequenceFileOutputFormat.class);
+ Job prepareInput = prepareJob(getInputPath(),
getTempPath(TMP_CLOSING_EDGES), SequenceFileInputFormat.class,
+ PrepareInputMapper.class, JoinableUndirectedEdge.class,
VertexOrMarker.class, Reducer.class,
+ JoinableUndirectedEdge.class, VertexOrMarker.class,
SequenceFileOutputFormat.class);
prepareInput.setGroupingComparatorClass(JoinableUndirectedEdge.GroupingComparator.class);
prepareInput.waitForCompletion(true);
//join opentriads and edges pairwise to get all triangles
- Job joinTriads = prepareJob(new Path(triadsPath + "," +
joinableInputPath), outputPath,
+ Job joinTriads = prepareJob(getCombinedTempPath(TMP_OPEN_TRIADS,
TMP_CLOSING_EDGES), getOutputPath(),
SequenceFileInputFormat.class, Mapper.class,
JoinableUndirectedEdge.class, VertexOrMarker.class,
JoinTrianglesReducer.class, Triangle.class, NullWritable.class,
SequenceFileOutputFormat.class);
joinTriads.setGroupingComparatorClass(JoinableUndirectedEdge.GroupingComparator.class);