Author: srowen
Date: Thu Mar 31 10:40:07 2011
New Revision: 1087245

URL: http://svn.apache.org/viewvc?rev=1087245&view=rev
Log:
MAHOUT-641 Pass Configuration into and through distributed matrix operations 
correctly

Modified:
    
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
    
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java?rev=1087245&r1=1087244&r2=1087245&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
 Thu Mar 31 10:40:07 2011
@@ -149,8 +149,14 @@ public class DistributedRowMatrix implem
       throw new CardinalityException(numRows, other.numRows());
     }
     Path outPath = new Path(outputTmpBasePath.getParent(), "productWith-" + 
(System.nanoTime() & 0xFF));
+    
+    Configuration initialConf = getConf() == null ? new Configuration() : 
getConf();
     Configuration conf =
-        MatrixMultiplicationJob.createMatrixMultiplyJobConf(rowPath, 
other.rowPath, outPath, other.numCols);
+        MatrixMultiplicationJob.createMatrixMultiplyJobConf(initialConf, 
+                                                            rowPath, 
+                                                            other.rowPath, 
+                                                            outPath, 
+                                                            other.numCols);
     JobClient.runJob(new JobConf(conf));
     DistributedRowMatrix out = new DistributedRowMatrix(outPath, 
outputTmpPath, numCols, other.numCols());
     out.setConf(conf);
@@ -159,7 +165,8 @@ public class DistributedRowMatrix implem
 
   public DistributedRowMatrix transpose() throws IOException {
     Path outputPath = new Path(rowPath.getParent(), "transpose-" + 
(System.nanoTime() & 0xFF));
-    Configuration conf = TransposeJob.buildTransposeJobConf(rowPath, 
outputPath, numRows);
+    Configuration initialConf = getConf() == null ? new Configuration() : 
getConf();
+    Configuration conf = TransposeJob.buildTransposeJobConf(initialConf, 
rowPath, outputPath, numRows);
     JobClient.runJob(new JobConf(conf));
     DistributedRowMatrix m = new DistributedRowMatrix(outputPath, 
outputTmpPath, numCols, numRows);
     m.setConf(this.conf);
@@ -169,8 +176,10 @@ public class DistributedRowMatrix implem
   @Override
   public Vector times(Vector v) {
     try {
+      Configuration initialConf = getConf() == null ? new Configuration() : 
getConf();
       Configuration conf =
-          TimesSquaredJob.createTimesJobConf(v,
+          TimesSquaredJob.createTimesJobConf(initialConf, 
+                                             v,
                                              numRows,
                                              rowPath,
                                              new Path(outputTmpPath, 
Long.toString(System.nanoTime())));
@@ -184,8 +193,10 @@ public class DistributedRowMatrix implem
   @Override
   public Vector timesSquared(Vector v) {
     try {
+      Configuration initialConf = getConf() == null ? new Configuration() : 
getConf();
       Configuration conf =
-          TimesSquaredJob.createTimesSquaredJobConf(v,
+          TimesSquaredJob.createTimesSquaredJobConf(initialConf,
+                                                    v,
                                                     rowPath,
                                                     new Path(outputTmpBasePath,
                                                              new 
Path(Long.toString(System.nanoTime()))));

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java?rev=1087245&r1=1087244&r2=1087245&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java
 Thu Mar 31 10:40:07 2011
@@ -46,8 +46,19 @@ public class MatrixMultiplicationJob ext
 
   private static final String OUT_CARD = "output.vector.cardinality";
 
-  public static Configuration createMatrixMultiplyJobConf(Path aPath, Path 
bPath, Path outPath, int outCardinality) {
-    JobConf conf = new JobConf(MatrixMultiplicationJob.class);
+  public static Configuration createMatrixMultiplyJobConf(Path aPath, 
+                                                          Path bPath, 
+                                                          Path outPath, 
+                                                          int outCardinality) {
+    return createMatrixMultiplyJobConf(new Configuration(), aPath, bPath, 
outPath, outCardinality);
+  }
+  
+  public static Configuration createMatrixMultiplyJobConf(Configuration 
initialConf, 
+                                                          Path aPath, 
+                                                          Path bPath, 
+                                                          Path outPath, 
+                                                          int outCardinality) {
+    JobConf conf = new JobConf(initialConf, MatrixMultiplicationJob.class);
     conf.setInputFormat(CompositeInputFormat.class);
     conf.set("mapred.join.expr", CompositeInputFormat.compose(
           "inner", SequenceFileInputFormat.class, aPath, bPath));

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java?rev=1087245&r1=1087244&r2=1087245&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
 Thu Mar 31 10:40:07 2011
@@ -60,10 +60,17 @@ public final class TimesSquaredJob {
 
   private TimesSquaredJob() { }
 
-  public static Configuration createTimesSquaredJobConf(Vector v,
+  public static Configuration createTimesSquaredJobConf(Vector v, Path 
matrixInputPath, Path outputVectorPath)
+    throws IOException {
+    return createTimesSquaredJobConf(new Configuration(), v, matrixInputPath, 
outputVectorPath);
+  }
+  
+  public static Configuration createTimesSquaredJobConf(Configuration 
initialConf,
+                                                        Vector v,
                                                         Path matrixInputPath,
                                                         Path outputVectorPath) 
throws IOException {
-    return createTimesSquaredJobConf(v,
+    return createTimesSquaredJobConf(initialConf, 
+                                     v,
                                      matrixInputPath,
                                      outputVectorPath,
                                      TimesSquaredMapper.class,
@@ -74,7 +81,16 @@ public final class TimesSquaredJob {
                                                  int outDim,
                                                  Path matrixInputPath,
                                                  Path outputVectorPath) throws 
IOException {
-    return createTimesSquaredJobConf(v,
+    return createTimesJobConf(new Configuration(), v, outDim, matrixInputPath, 
outputVectorPath);
+  }
+    
+  public static Configuration createTimesJobConf(Configuration initialConf, 
+                                                 Vector v,
+                                                 int outDim,
+                                                 Path matrixInputPath,
+                                                 Path outputVectorPath) throws 
IOException {
+    return createTimesSquaredJobConf(initialConf,
+                                     v,
                                      outDim,
                                      matrixInputPath,
                                      outputVectorPath,
@@ -82,14 +98,29 @@ public final class TimesSquaredJob {
                                      VectorSummingReducer.class);
   }
 
-
   public static Configuration createTimesSquaredJobConf(Vector v,
                                                         Path matrixInputPath,
                                                         Path 
outputVectorPathBase,
                                                         Class<? extends 
TimesSquaredMapper> mapClass,
                                                         Class<? extends 
VectorSummingReducer> redClass)
     throws IOException {
-    return createTimesSquaredJobConf(v, v.size(), matrixInputPath, 
outputVectorPathBase, mapClass, redClass);
+    return createTimesSquaredJobConf(new Configuration(), v, matrixInputPath, 
outputVectorPathBase, mapClass, redClass);
+  }
+  
+  public static Configuration createTimesSquaredJobConf(Configuration 
initialConf,
+                                                        Vector v,
+                                                        Path matrixInputPath,
+                                                        Path 
outputVectorPathBase,
+                                                        Class<? extends 
TimesSquaredMapper> mapClass,
+                                                        Class<? extends 
VectorSummingReducer> redClass)
+    throws IOException {
+    return createTimesSquaredJobConf(initialConf, 
+                                     v, 
+                                     v.size(), 
+                                     matrixInputPath, 
+                                     outputVectorPathBase, 
+                                     mapClass, 
+                                     redClass);
   }
 
   public static Configuration createTimesSquaredJobConf(Vector v,
@@ -99,7 +130,25 @@ public final class TimesSquaredJob {
                                                         Class<? extends 
TimesSquaredMapper> mapClass,
                                                         Class<? extends 
VectorSummingReducer> redClass)
     throws IOException {
-    JobConf conf = new JobConf(TimesSquaredJob.class);
+
+    return createTimesSquaredJobConf(new Configuration(),
+                                     v,
+                                     outputVectorDim,
+                                     matrixInputPath,
+                                     outputVectorPathBase,
+                                     mapClass,
+                                     redClass);
+  }
+  
+  public static Configuration createTimesSquaredJobConf(Configuration 
initialConf, 
+                                                        Vector v,
+                                                        int outputVectorDim,
+                                                        Path matrixInputPath,
+                                                        Path 
outputVectorPathBase,
+                                                        Class<? extends 
TimesSquaredMapper> mapClass,
+                                                        Class<? extends 
VectorSummingReducer> redClass)
+    throws IOException {
+    JobConf conf = new JobConf(initialConf, TimesSquaredJob.class);
     conf.setJobName("TimesSquaredJob: " + matrixInputPath);
     FileSystem fs = FileSystem.get(conf);
     matrixInputPath = fs.makeQualified(matrixInputPath);

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java?rev=1087245&r1=1087244&r2=1087245&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java 
(original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java 
Thu Mar 31 10:40:07 2011
@@ -78,7 +78,14 @@ public class TransposeJob extends Abstra
   public static Configuration buildTransposeJobConf(Path matrixInputPath,
                                                     Path matrixOutputPath,
                                                     int numInputRows) throws 
IOException {
-    JobConf conf = new JobConf(TransposeJob.class);
+    return buildTransposeJobConf(new Configuration(), matrixInputPath, 
matrixOutputPath, numInputRows);
+  }
+  
+  public static Configuration buildTransposeJobConf(Configuration initialConf,
+                                                    Path matrixInputPath,
+                                                    Path matrixOutputPath,
+                                                    int numInputRows) throws 
IOException {
+    JobConf conf = new JobConf(initialConf, TransposeJob.class);
     conf.setJobName("TransposeJob: " + matrixInputPath + " transpose -> " + 
matrixOutputPath);
     FileSystem fs = FileSystem.get(conf);
     matrixInputPath = fs.makeQualified(matrixInputPath);

Modified: 
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java?rev=1087245&r1=1087244&r2=1087245&view=diff
==============================================================================
--- 
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java
 (original)
+++ 
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java
 Thu Mar 31 10:40:07 2011
@@ -17,6 +17,11 @@
 
 package org.apache.mahout.math.hadoop;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -31,12 +36,9 @@ import org.apache.mahout.math.VectorWrit
 import org.apache.mahout.math.decomposer.SolverTest;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
 public final class TestDistributedRowMatrix extends MahoutTestCase {
+  public static final String TEST_PROPERTY_KEY = "test.property.key";
+  public static final String TEST_PROPERTY_VALUE = "test.property.value";
 
   private static void assertEquals(VectorIterable m, VectorIterable mtt, 
double errorTolerance) {
     Iterator<MatrixSlice> mIt = m.iterateAll();
@@ -108,6 +110,102 @@ public final class TestDistributedRowMat
     assertEquals(expected, product, EPSILON);
   }
 
+  @Test
+  public void testMatrixMultiplactionJobConfBuilder() throws Exception {    
+    Configuration initialConf = createInitialConf();
+        
+    Path baseTmpDirPath = getTestTempDirPath("testpaths");    
+    Path aPath = new Path(baseTmpDirPath, "a");
+    Path bPath = new Path(baseTmpDirPath, "b");
+    Path outPath = new Path(baseTmpDirPath, "out");
+    
+    Configuration mmJobConf = 
MatrixMultiplicationJob.createMatrixMultiplyJobConf(aPath, bPath, outPath, 10);
+    Configuration mmCustomJobConf = 
MatrixMultiplicationJob.createMatrixMultiplyJobConf(initialConf, 
+                                                                               
         aPath, 
+                                                                               
         bPath, 
+                                                                               
         outPath, 
+                                                                               
         10);
+    
+    assertNull(mmJobConf.get(TEST_PROPERTY_KEY));
+    assertEquals(TEST_PROPERTY_VALUE, mmCustomJobConf.get(TEST_PROPERTY_KEY)); 
 
+  }
+  
+  @Test
+  public void testTransposeJobConfBuilder() throws Exception {
+    Configuration initialConf = createInitialConf();
+    
+    Path baseTmpDirPath = getTestTempDirPath("testpaths");    
+    Path inputPath = new Path(baseTmpDirPath, "input");
+    Path outputPath = new Path(baseTmpDirPath, "output");
+    
+    Configuration transposeJobConf = 
TransposeJob.buildTransposeJobConf(inputPath, outputPath, 10);
+    Configuration transposeCustomJobConf = 
TransposeJob.buildTransposeJobConf(initialConf, inputPath, outputPath, 10);
+
+    assertNull(transposeJobConf.get(TEST_PROPERTY_KEY));
+    assertEquals(TEST_PROPERTY_VALUE, 
transposeCustomJobConf.get(TEST_PROPERTY_KEY));
+  }
+
+  @Test public void testTimesSquaredJobConfBuilders() throws Exception {
+    Configuration initialConf = createInitialConf();
+
+    Path baseTmpDirPath = getTestTempDirPath("testpaths");    
+    Path inputPath = new Path(baseTmpDirPath, "input");
+    Path outputPath = new Path(baseTmpDirPath, "output");
+
+    Vector v = new RandomAccessSparseVector(50);
+    v.assign(1.0);
+
+    Configuration timesSquaredJobConf1 = 
TimesSquaredJob.createTimesSquaredJobConf(v, inputPath, outputPath);
+    Configuration customTimesSquaredJobConf1 = 
TimesSquaredJob.createTimesSquaredJobConf(initialConf, v, inputPath, 
outputPath);
+
+    assertNull(timesSquaredJobConf1.get(TEST_PROPERTY_KEY));
+    assertEquals(TEST_PROPERTY_VALUE, 
customTimesSquaredJobConf1.get(TEST_PROPERTY_KEY));
+    
+    Configuration timesJobConf = TimesSquaredJob.createTimesJobConf(v, 50, 
inputPath, outputPath);
+    Configuration customTimesJobConf = 
TimesSquaredJob.createTimesJobConf(initialConf, v, 50, inputPath, outputPath);
+    
+    assertNull(timesJobConf.get(TEST_PROPERTY_KEY));
+    assertEquals(TEST_PROPERTY_VALUE, 
customTimesJobConf.get(TEST_PROPERTY_KEY));
+    
+    Configuration timesSquaredJobConf2 = 
TimesSquaredJob.createTimesSquaredJobConf(v, 
+                                                                               
    inputPath, 
+                                                                               
    outputPath, 
+                                                                               
    TimesSquaredJob.TimesSquaredMapper.class, 
+                                                                               
    TimesSquaredJob.VectorSummingReducer.class);
+    Configuration customTimesSquaredJobConf2 = 
TimesSquaredJob.createTimesSquaredJobConf(initialConf,
+                                                                               
          v, 
+                                                                               
          inputPath, 
+                                                                               
          outputPath, 
+                                                                               
          TimesSquaredJob.TimesSquaredMapper.class, 
+                                                                               
          TimesSquaredJob.VectorSummingReducer.class);
+ 
+    assertNull(timesSquaredJobConf2.get(TEST_PROPERTY_KEY));
+    assertEquals(TEST_PROPERTY_VALUE, 
customTimesSquaredJobConf2.get(TEST_PROPERTY_KEY));
+
+    Configuration timesSquaredJobConf3 = 
TimesSquaredJob.createTimesSquaredJobConf(v,
+                                                                               
    50,
+                                                                               
    inputPath, 
+                                                                               
    outputPath, 
+                                                                               
    TimesSquaredJob.TimesSquaredMapper.class, 
+                                                                               
    TimesSquaredJob.VectorSummingReducer.class);
+    Configuration customTimesSquaredJobConf3 = 
TimesSquaredJob.createTimesSquaredJobConf(initialConf,
+                                                                               
          v,
+                                                                               
          50,
+                                                                               
          inputPath, 
+                                                                               
          outputPath, 
+                                                                               
          TimesSquaredJob.TimesSquaredMapper.class, 
+                                                                               
          TimesSquaredJob.VectorSummingReducer.class);
+ 
+    assertNull(timesSquaredJobConf3.get(TEST_PROPERTY_KEY));
+    assertEquals(TEST_PROPERTY_VALUE, 
customTimesSquaredJobConf3.get(TEST_PROPERTY_KEY));
+  }
+  
+  public Configuration createInitialConf() {
+    Configuration initialConf = new Configuration();
+    initialConf.set(TEST_PROPERTY_KEY, TEST_PROPERTY_VALUE);
+    return initialConf;
+  }
+  
   public DistributedRowMatrix randomDistributedMatrix(int numRows,
                                                       int nonNullRows,
                                                       int numCols,


Reply via email to