Author: ssc
Date: Mon Jul  4 13:59:13 2011
New Revision: 1142668

URL: http://svn.apache.org/viewvc?rev=1142668&view=rev
Log:
small cleanup of tmp directory management in the graph jobs

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=1142668&r1=1142667&r2=1142668&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java 
(original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java 
Mon Jul  4 13:59:13 2011
@@ -131,6 +131,10 @@ public abstract class AbstractJob extend
     return new Path(tempPath, directory);
   }
 
+  protected Path getCombinedTempPath(String directory1, String directory2) {
+    return new Path(new Path(tempPath, directory1) + "," + new Path(tempPath, 
directory2));
+  }
+
   /** Add an option with no argument whose presence can be checked for using
    *  {@code containsKey} method on the map returned by {@link 
#parseArguments(String[])};
    */

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java?rev=1142668&r1=1142667&r2=1142668&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/AugmentGraphWithDegreesJob.java
 Mon Jul  4 13:59:13 2011
@@ -43,6 +43,8 @@ import org.apache.mahout.graph.model.Ver
  */
 public class AugmentGraphWithDegreesJob extends AbstractJob {
 
+  public static final String TMP_AUGMENTED_EDGES = "augmented-edges";
+
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new AugmentGraphWithDegreesJob(), args);
   }
@@ -58,22 +60,19 @@ public class AugmentGraphWithDegreesJob 
       return -1;
     }
 
-    Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
-
     Path inputPath = getInputPath();
-    Path augmentedEdgesPath = new Path(tempDirPath, "augmented-edges");
     Path outputPath = getOutputPath();
 
     // scatter the edges to each of the vertices and count degree
-    Job scatter = prepareJob(inputPath, augmentedEdgesPath, 
SequenceFileInputFormat.class, ScatterEdgesMapper.class,
-        Vertex.class, Vertex.class, SumDegreesReducer.class, 
UndirectedEdge.class, VertexWithDegree.class,
-        SequenceFileOutputFormat.class);
+    Job scatter = prepareJob(inputPath, getTempPath(TMP_AUGMENTED_EDGES), 
SequenceFileInputFormat.class,
+        ScatterEdgesMapper.class, Vertex.class, Vertex.class, 
SumDegreesReducer.class, UndirectedEdge.class,
+        VertexWithDegree.class, SequenceFileOutputFormat.class);
     scatter.waitForCompletion(true);
 
     // join augmented edges with partial degree information to to complete 
records
-    Job join = prepareJob(augmentedEdgesPath, outputPath, 
SequenceFileInputFormat.class, Mapper.class,
-        UndirectedEdge.class, VertexWithDegree.class, 
JoinDegreesReducer.class, UndirectedEdgeWithDegrees.class,
-        NullWritable.class, SequenceFileOutputFormat.class);
+    Job join = prepareJob(getTempPath(TMP_AUGMENTED_EDGES), outputPath, 
SequenceFileInputFormat.class,
+        Mapper.class, UndirectedEdge.class, VertexWithDegree.class, 
JoinDegreesReducer.class,
+        UndirectedEdgeWithDegrees.class, NullWritable.class, 
SequenceFileOutputFormat.class);
     join.waitForCompletion(true);
 
     return 0;

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java?rev=1142668&r1=1142667&r2=1142668&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/DegreeDistributionJob.java
 Mon Jul  4 13:59:13 2011
@@ -17,7 +17,6 @@
 
 package org.apache.mahout.graph.common;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
@@ -56,6 +55,8 @@ import java.util.Map;
  */
 public class DegreeDistributionJob extends AbstractJob {
 
+  public static final String TMP_DEGREES_PER_VERTEX = "degreesPerVertex";
+
   private static final IntWritable ONE = new IntWritable(1);
 
   public static void main(String[] args) throws Exception {
@@ -72,18 +73,15 @@ public class DegreeDistributionJob exten
       return -1;
     }
 
-    Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
-    Path degreesPerVertexPath = new Path(tempDirPath, "degreesPerVertex");
-
-    Job degreesPerVertex = prepareJob(getInputPath(), degreesPerVertexPath, 
SequenceFileInputFormat.class,
-        DegreeOfVertexMapper.class, Vertex.class, IntWritable.class, 
IntSumReducer.class, Vertex.class,
-        IntWritable.class, SequenceFileOutputFormat.class);
+    Job degreesPerVertex = prepareJob(getInputPath(), 
getTempPath(TMP_DEGREES_PER_VERTEX),
+        SequenceFileInputFormat.class, DegreeOfVertexMapper.class, 
Vertex.class, IntWritable.class, IntSumReducer.class,
+        Vertex.class, IntWritable.class, SequenceFileOutputFormat.class);
     degreesPerVertex.setCombinerClass(IntSumReducer.class);
     degreesPerVertex.waitForCompletion(true);
 
-    Job degreeDistribution = prepareJob(degreesPerVertexPath, getOutputPath(), 
SequenceFileInputFormat.class,
-        DegreesMapper.class, IntWritable.class, IntWritable.class, 
IntSumReducer.class, IntWritable.class,
-        IntWritable.class, TextOutputFormat.class);
+    Job degreeDistribution = prepareJob(getTempPath(TMP_DEGREES_PER_VERTEX), 
getOutputPath(),
+        SequenceFileInputFormat.class, DegreesMapper.class, IntWritable.class, 
IntWritable.class, IntSumReducer.class,
+        IntWritable.class, IntWritable.class, TextOutputFormat.class);
     degreeDistribution.setCombinerClass(IntSumReducer.class);
     degreeDistribution.waitForCompletion(true);
 

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java?rev=1142668&r1=1142667&r2=1142668&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/LocalClusteringCoefficientJob.java
 Mon Jul  4 13:59:13 2011
@@ -64,6 +64,9 @@ import java.util.Map;
  */
 public class LocalClusteringCoefficientJob extends AbstractJob {
 
+  public static final String TMP_EDGES_PER_VERTEX = "edgesPerVertex";
+  public static final String TMP_TRIANGLES_PER_VERTEX = "trianglesPerVertex";
+
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new LocalClusteringCoefficientJob(), args);
   }
@@ -83,27 +86,23 @@ public class LocalClusteringCoefficientJ
     Path edgesPath = new Path(parsedArgs.get("--edges"));
     Path trianglesPath = new Path(parsedArgs.get("--triangles"));
 
-    Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
-
-    Path edgesPerVertex = new Path(tempDirPath, "edgesPerVertex");
-    Path trianglesPerVertex = new Path(tempDirPath, "trianglesPerVertex");
-
     // unfortunately we don't have access to an undeprecated MultipleInputs, 
so we need several M/R steps instead of one...
-    Job countEdgesPerVertex = prepareJob(edgesPath, edgesPerVertex, 
SequenceFileInputFormat.class,
-        EdgeCountMapper.class, Vertex.class, TriangleOrEdgeCount.class, 
Reducer.class, Vertex.class,
-        TriangleOrEdgeCount.class, SequenceFileOutputFormat.class);
+    Job countEdgesPerVertex = prepareJob(edgesPath, 
getTempPath(TMP_EDGES_PER_VERTEX),
+        SequenceFileInputFormat.class, EdgeCountMapper.class, Vertex.class, 
TriangleOrEdgeCount.class, Reducer.class,
+        Vertex.class, TriangleOrEdgeCount.class, 
SequenceFileOutputFormat.class);
     countEdgesPerVertex.setCombinerClass(TriangleOrEdgeCountCombiner.class);
     countEdgesPerVertex.waitForCompletion(true);
 
-    Job countTrianglesPerVertex = prepareJob(trianglesPath, 
trianglesPerVertex, SequenceFileInputFormat.class,
-        TriangleCountMapper.class, Vertex.class, TriangleOrEdgeCount.class, 
Reducer.class, Vertex.class,
-        TriangleOrEdgeCount.class, SequenceFileOutputFormat.class);
+    Job countTrianglesPerVertex = prepareJob(trianglesPath, 
getTempPath(TMP_TRIANGLES_PER_VERTEX),
+        SequenceFileInputFormat.class, TriangleCountMapper.class, 
Vertex.class, TriangleOrEdgeCount.class,
+        Reducer.class, Vertex.class, TriangleOrEdgeCount.class, 
SequenceFileOutputFormat.class);
     
countTrianglesPerVertex.setCombinerClass(TriangleOrEdgeCountCombiner.class);
     countTrianglesPerVertex.waitForCompletion(true);
 
-    Job computeLocalClusteringCoefficient = prepareJob(new Path(edgesPerVertex 
+ "," + trianglesPerVertex),
-        getOutputPath(), SequenceFileInputFormat.class, Mapper.class, 
Vertex.class, TriangleOrEdgeCount.class,
-        LocalClusteringCoefficientReducer.class, LongWritable.class, 
DoubleWritable.class, TextOutputFormat.class);
+    Job computeLocalClusteringCoefficient = 
prepareJob(getCombinedTempPath(TMP_EDGES_PER_VERTEX,
+        TMP_TRIANGLES_PER_VERTEX), getOutputPath(), 
SequenceFileInputFormat.class, Mapper.class,
+        Vertex.class, TriangleOrEdgeCount.class, 
LocalClusteringCoefficientReducer.class, LongWritable.class,
+        DoubleWritable.class, TextOutputFormat.class);
     
computeLocalClusteringCoefficient.setCombinerClass(TriangleOrEdgeCountCombiner.class);
     computeLocalClusteringCoefficient.waitForCompletion(true);
 

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java?rev=1142668&r1=1142667&r2=1142668&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/graph/common/SimplifyGraphJob.java
 Mon Jul  4 13:59:13 2011
@@ -20,7 +20,6 @@ package org.apache.mahout.graph.common;
 import java.io.IOException;
 import java.util.regex.Pattern;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
@@ -64,10 +63,7 @@ public class SimplifyGraphJob extends Ab
       return -1;
     }
 
-    Path inputPath = getInputPath();
-    Path outputPath = getOutputPath();
-
-    Job simplify = prepareJob(inputPath, outputPath, TextInputFormat.class, 
SimplifyGraphMapper.class,
+    Job simplify = prepareJob(getInputPath(), getOutputPath(), 
TextInputFormat.class, SimplifyGraphMapper.class,
         UndirectedEdge.class, NullWritable.class, SimplifyGraphReducer.class, 
UndirectedEdge.class, NullWritable.class,
         SequenceFileOutputFormat.class);
     simplify.waitForCompletion(true);
@@ -79,6 +75,7 @@ public class SimplifyGraphJob extends Ab
   public static class SimplifyGraphMapper extends Mapper<Object, Text, 
UndirectedEdge, NullWritable> {
 
     private static final Pattern SEPARATOR = Pattern.compile(",");
+
     @Override
     public void map(Object key, Text line, Context ctx) throws IOException, 
InterruptedException {
       try {

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java?rev=1142668&r1=1142667&r2=1142668&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/graph/triangles/EnumerateTrianglesJob.java
 Mon Jul  4 13:59:13 2011
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -41,6 +40,9 @@ import org.apache.mahout.graph.model.Ver
 /** Enumerates all triangles of an undirected graph. */
 public class EnumerateTrianglesJob extends AbstractJob {
 
+  public static final String TMP_CLOSING_EDGES = "closingEdges";
+  public static final String TMP_OPEN_TRIADS = "openTriads";
+
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new EnumerateTrianglesJob(), args);
   }
@@ -55,29 +57,22 @@ public class EnumerateTrianglesJob exten
       return -1;
     }
 
-    Path tempDirPath = new Path(parsedArgs.get("--tempDir"));
-
-    Path inputPath = getInputPath();
-    Path joinableInputPath = new Path(tempDirPath, "joinableInput");
-    Path triadsPath = new Path(tempDirPath, "triangles");
-    Path outputPath = getOutputPath();
-
     // scatter the edges to lower degree vertex and build open triads
-    Job scatter = prepareJob(inputPath, triadsPath, 
SequenceFileInputFormat.class,
+    Job scatter = prepareJob(getInputPath(), getTempPath(TMP_OPEN_TRIADS), 
SequenceFileInputFormat.class,
         ScatterEdgesToLowerDegreeVertexMapper.class, Vertex.class, 
Vertex.class,
         BuildOpenTriadsReducer.class, JoinableUndirectedEdge.class, 
VertexOrMarker.class,
         SequenceFileOutputFormat.class);
     scatter.waitForCompletion(true);
 
     // necessary as long as we don't have access to an undeprecated 
MultipleInputs
-    Job prepareInput = prepareJob(inputPath, joinableInputPath, 
SequenceFileInputFormat.class, PrepareInputMapper.class,
-        JoinableUndirectedEdge.class, VertexOrMarker.class, Reducer.class, 
JoinableUndirectedEdge.class,
-        VertexOrMarker.class, SequenceFileOutputFormat.class);
+    Job prepareInput = prepareJob(getInputPath(), 
getTempPath(TMP_CLOSING_EDGES), SequenceFileInputFormat.class,
+        PrepareInputMapper.class, JoinableUndirectedEdge.class, 
VertexOrMarker.class, Reducer.class,
+        JoinableUndirectedEdge.class, VertexOrMarker.class, 
SequenceFileOutputFormat.class);
     
prepareInput.setGroupingComparatorClass(JoinableUndirectedEdge.GroupingComparator.class);
     prepareInput.waitForCompletion(true);
 
     //join opentriads and edges pairwise to get all triangles
-    Job joinTriads = prepareJob(new Path(triadsPath + "," + 
joinableInputPath), outputPath,
+    Job joinTriads = prepareJob(getCombinedTempPath(TMP_OPEN_TRIADS, 
TMP_CLOSING_EDGES), getOutputPath(),
         SequenceFileInputFormat.class, Mapper.class, 
JoinableUndirectedEdge.class, VertexOrMarker.class,
         JoinTrianglesReducer.class, Triangle.class, NullWritable.class, 
SequenceFileOutputFormat.class);
     
joinTriads.setGroupingComparatorClass(JoinableUndirectedEdge.GroupingComparator.class);


Reply via email to