Author: srowen
Date: Thu Mar 31 10:40:07 2011
New Revision: 1087245
URL: http://svn.apache.org/viewvc?rev=1087245&view=rev
Log:
MAHOUT-641 Pass Configuration into and through distributed matrix operations
correctly
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java?rev=1087245&r1=1087244&r2=1087245&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
Thu Mar 31 10:40:07 2011
@@ -149,8 +149,14 @@ public class DistributedRowMatrix implem
throw new CardinalityException(numRows, other.numRows());
}
Path outPath = new Path(outputTmpBasePath.getParent(), "productWith-" +
(System.nanoTime() & 0xFF));
+
+ Configuration initialConf = getConf() == null ? new Configuration() :
getConf();
Configuration conf =
- MatrixMultiplicationJob.createMatrixMultiplyJobConf(rowPath,
other.rowPath, outPath, other.numCols);
+ MatrixMultiplicationJob.createMatrixMultiplyJobConf(initialConf,
+ rowPath,
+ other.rowPath,
+ outPath,
+ other.numCols);
JobClient.runJob(new JobConf(conf));
DistributedRowMatrix out = new DistributedRowMatrix(outPath,
outputTmpPath, numCols, other.numCols());
out.setConf(conf);
@@ -159,7 +165,8 @@ public class DistributedRowMatrix implem
public DistributedRowMatrix transpose() throws IOException {
Path outputPath = new Path(rowPath.getParent(), "transpose-" +
(System.nanoTime() & 0xFF));
- Configuration conf = TransposeJob.buildTransposeJobConf(rowPath,
outputPath, numRows);
+ Configuration initialConf = getConf() == null ? new Configuration() :
getConf();
+ Configuration conf = TransposeJob.buildTransposeJobConf(initialConf,
rowPath, outputPath, numRows);
JobClient.runJob(new JobConf(conf));
DistributedRowMatrix m = new DistributedRowMatrix(outputPath,
outputTmpPath, numCols, numRows);
m.setConf(this.conf);
@@ -169,8 +176,10 @@ public class DistributedRowMatrix implem
@Override
public Vector times(Vector v) {
try {
+ Configuration initialConf = getConf() == null ? new Configuration() :
getConf();
Configuration conf =
- TimesSquaredJob.createTimesJobConf(v,
+ TimesSquaredJob.createTimesJobConf(initialConf,
+ v,
numRows,
rowPath,
new Path(outputTmpPath,
Long.toString(System.nanoTime())));
@@ -184,8 +193,10 @@ public class DistributedRowMatrix implem
@Override
public Vector timesSquared(Vector v) {
try {
+ Configuration initialConf = getConf() == null ? new Configuration() :
getConf();
Configuration conf =
- TimesSquaredJob.createTimesSquaredJobConf(v,
+ TimesSquaredJob.createTimesSquaredJobConf(initialConf,
+ v,
rowPath,
new Path(outputTmpBasePath,
new
Path(Long.toString(System.nanoTime()))));
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java?rev=1087245&r1=1087244&r2=1087245&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixMultiplicationJob.java
Thu Mar 31 10:40:07 2011
@@ -46,8 +46,19 @@ public class MatrixMultiplicationJob ext
private static final String OUT_CARD = "output.vector.cardinality";
- public static Configuration createMatrixMultiplyJobConf(Path aPath, Path
bPath, Path outPath, int outCardinality) {
- JobConf conf = new JobConf(MatrixMultiplicationJob.class);
+ public static Configuration createMatrixMultiplyJobConf(Path aPath,
+ Path bPath,
+ Path outPath,
+ int outCardinality) {
+ return createMatrixMultiplyJobConf(new Configuration(), aPath, bPath,
outPath, outCardinality);
+ }
+
+ public static Configuration createMatrixMultiplyJobConf(Configuration
initialConf,
+ Path aPath,
+ Path bPath,
+ Path outPath,
+ int outCardinality) {
+ JobConf conf = new JobConf(initialConf, MatrixMultiplicationJob.class);
conf.setInputFormat(CompositeInputFormat.class);
conf.set("mapred.join.expr", CompositeInputFormat.compose(
"inner", SequenceFileInputFormat.class, aPath, bPath));
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java?rev=1087245&r1=1087244&r2=1087245&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TimesSquaredJob.java
Thu Mar 31 10:40:07 2011
@@ -60,10 +60,17 @@ public final class TimesSquaredJob {
private TimesSquaredJob() { }
- public static Configuration createTimesSquaredJobConf(Vector v,
+ public static Configuration createTimesSquaredJobConf(Vector v, Path
matrixInputPath, Path outputVectorPath)
+ throws IOException {
+ return createTimesSquaredJobConf(new Configuration(), v, matrixInputPath,
outputVectorPath);
+ }
+
+ public static Configuration createTimesSquaredJobConf(Configuration
initialConf,
+ Vector v,
Path matrixInputPath,
Path outputVectorPath)
throws IOException {
- return createTimesSquaredJobConf(v,
+ return createTimesSquaredJobConf(initialConf,
+ v,
matrixInputPath,
outputVectorPath,
TimesSquaredMapper.class,
@@ -74,7 +81,16 @@ public final class TimesSquaredJob {
int outDim,
Path matrixInputPath,
Path outputVectorPath) throws
IOException {
- return createTimesSquaredJobConf(v,
+ return createTimesJobConf(new Configuration(), v, outDim, matrixInputPath,
outputVectorPath);
+ }
+
+ public static Configuration createTimesJobConf(Configuration initialConf,
+ Vector v,
+ int outDim,
+ Path matrixInputPath,
+ Path outputVectorPath) throws
IOException {
+ return createTimesSquaredJobConf(initialConf,
+ v,
outDim,
matrixInputPath,
outputVectorPath,
@@ -82,14 +98,29 @@ public final class TimesSquaredJob {
VectorSummingReducer.class);
}
-
public static Configuration createTimesSquaredJobConf(Vector v,
Path matrixInputPath,
Path
outputVectorPathBase,
Class<? extends
TimesSquaredMapper> mapClass,
Class<? extends
VectorSummingReducer> redClass)
throws IOException {
- return createTimesSquaredJobConf(v, v.size(), matrixInputPath,
outputVectorPathBase, mapClass, redClass);
+ return createTimesSquaredJobConf(new Configuration(), v, matrixInputPath,
outputVectorPathBase, mapClass, redClass);
+ }
+
+ public static Configuration createTimesSquaredJobConf(Configuration
initialConf,
+ Vector v,
+ Path matrixInputPath,
+ Path
outputVectorPathBase,
+ Class<? extends
TimesSquaredMapper> mapClass,
+ Class<? extends
VectorSummingReducer> redClass)
+ throws IOException {
+ return createTimesSquaredJobConf(initialConf,
+ v,
+ v.size(),
+ matrixInputPath,
+ outputVectorPathBase,
+ mapClass,
+ redClass);
}
public static Configuration createTimesSquaredJobConf(Vector v,
@@ -99,7 +130,25 @@ public final class TimesSquaredJob {
Class<? extends
TimesSquaredMapper> mapClass,
Class<? extends
VectorSummingReducer> redClass)
throws IOException {
- JobConf conf = new JobConf(TimesSquaredJob.class);
+
+ return createTimesSquaredJobConf(new Configuration(),
+ v,
+ outputVectorDim,
+ matrixInputPath,
+ outputVectorPathBase,
+ mapClass,
+ redClass);
+ }
+
+ public static Configuration createTimesSquaredJobConf(Configuration
initialConf,
+ Vector v,
+ int outputVectorDim,
+ Path matrixInputPath,
+ Path
outputVectorPathBase,
+ Class<? extends
TimesSquaredMapper> mapClass,
+ Class<? extends
VectorSummingReducer> redClass)
+ throws IOException {
+ JobConf conf = new JobConf(initialConf, TimesSquaredJob.class);
conf.setJobName("TimesSquaredJob: " + matrixInputPath);
FileSystem fs = FileSystem.get(conf);
matrixInputPath = fs.makeQualified(matrixInputPath);
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java?rev=1087245&r1=1087244&r2=1087245&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
Thu Mar 31 10:40:07 2011
@@ -78,7 +78,14 @@ public class TransposeJob extends Abstra
public static Configuration buildTransposeJobConf(Path matrixInputPath,
Path matrixOutputPath,
int numInputRows) throws
IOException {
- JobConf conf = new JobConf(TransposeJob.class);
+ return buildTransposeJobConf(new Configuration(), matrixInputPath,
matrixOutputPath, numInputRows);
+ }
+
+ public static Configuration buildTransposeJobConf(Configuration initialConf,
+ Path matrixInputPath,
+ Path matrixOutputPath,
+ int numInputRows) throws
IOException {
+ JobConf conf = new JobConf(initialConf, TransposeJob.class);
conf.setJobName("TransposeJob: " + matrixInputPath + " transpose -> " +
matrixOutputPath);
FileSystem fs = FileSystem.get(conf);
matrixInputPath = fs.makeQualified(matrixInputPath);
Modified:
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java?rev=1087245&r1=1087244&r2=1087245&view=diff
==============================================================================
---
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java
(original)
+++
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java
Thu Mar 31 10:40:07 2011
@@ -17,6 +17,11 @@
package org.apache.mahout.math.hadoop;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -31,12 +36,9 @@ import org.apache.mahout.math.VectorWrit
import org.apache.mahout.math.decomposer.SolverTest;
import org.junit.Test;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
public final class TestDistributedRowMatrix extends MahoutTestCase {
+ public static final String TEST_PROPERTY_KEY = "test.property.key";
+ public static final String TEST_PROPERTY_VALUE = "test.property.value";
private static void assertEquals(VectorIterable m, VectorIterable mtt,
double errorTolerance) {
Iterator<MatrixSlice> mIt = m.iterateAll();
@@ -108,6 +110,102 @@ public final class TestDistributedRowMat
assertEquals(expected, product, EPSILON);
}
+ @Test
+ public void testMatrixMultiplactionJobConfBuilder() throws Exception {
+ Configuration initialConf = createInitialConf();
+
+ Path baseTmpDirPath = getTestTempDirPath("testpaths");
+ Path aPath = new Path(baseTmpDirPath, "a");
+ Path bPath = new Path(baseTmpDirPath, "b");
+ Path outPath = new Path(baseTmpDirPath, "out");
+
+ Configuration mmJobConf =
MatrixMultiplicationJob.createMatrixMultiplyJobConf(aPath, bPath, outPath, 10);
+ Configuration mmCustomJobConf =
MatrixMultiplicationJob.createMatrixMultiplyJobConf(initialConf,
+
aPath,
+
bPath,
+
outPath,
+
10);
+
+ assertNull(mmJobConf.get(TEST_PROPERTY_KEY));
+ assertEquals(TEST_PROPERTY_VALUE, mmCustomJobConf.get(TEST_PROPERTY_KEY));
+ }
+
+ @Test
+ public void testTransposeJobConfBuilder() throws Exception {
+ Configuration initialConf = createInitialConf();
+
+ Path baseTmpDirPath = getTestTempDirPath("testpaths");
+ Path inputPath = new Path(baseTmpDirPath, "input");
+ Path outputPath = new Path(baseTmpDirPath, "output");
+
+ Configuration transposeJobConf =
TransposeJob.buildTransposeJobConf(inputPath, outputPath, 10);
+ Configuration transposeCustomJobConf =
TransposeJob.buildTransposeJobConf(initialConf, inputPath, outputPath, 10);
+
+ assertNull(transposeJobConf.get(TEST_PROPERTY_KEY));
+ assertEquals(TEST_PROPERTY_VALUE,
transposeCustomJobConf.get(TEST_PROPERTY_KEY));
+ }
+
+ @Test public void testTimesSquaredJobConfBuilders() throws Exception {
+ Configuration initialConf = createInitialConf();
+
+ Path baseTmpDirPath = getTestTempDirPath("testpaths");
+ Path inputPath = new Path(baseTmpDirPath, "input");
+ Path outputPath = new Path(baseTmpDirPath, "output");
+
+ Vector v = new RandomAccessSparseVector(50);
+ v.assign(1.0);
+
+ Configuration timesSquaredJobConf1 =
TimesSquaredJob.createTimesSquaredJobConf(v, inputPath, outputPath);
+ Configuration customTimesSquaredJobConf1 =
TimesSquaredJob.createTimesSquaredJobConf(initialConf, v, inputPath,
outputPath);
+
+ assertNull(timesSquaredJobConf1.get(TEST_PROPERTY_KEY));
+ assertEquals(TEST_PROPERTY_VALUE,
customTimesSquaredJobConf1.get(TEST_PROPERTY_KEY));
+
+ Configuration timesJobConf = TimesSquaredJob.createTimesJobConf(v, 50,
inputPath, outputPath);
+ Configuration customTimesJobConf =
TimesSquaredJob.createTimesJobConf(initialConf, v, 50, inputPath, outputPath);
+
+ assertNull(timesJobConf.get(TEST_PROPERTY_KEY));
+ assertEquals(TEST_PROPERTY_VALUE,
customTimesJobConf.get(TEST_PROPERTY_KEY));
+
+ Configuration timesSquaredJobConf2 =
TimesSquaredJob.createTimesSquaredJobConf(v,
+
inputPath,
+
outputPath,
+
TimesSquaredJob.TimesSquaredMapper.class,
+
TimesSquaredJob.VectorSummingReducer.class);
+ Configuration customTimesSquaredJobConf2 =
TimesSquaredJob.createTimesSquaredJobConf(initialConf,
+
v,
+
inputPath,
+
outputPath,
+
TimesSquaredJob.TimesSquaredMapper.class,
+
TimesSquaredJob.VectorSummingReducer.class);
+
+ assertNull(timesSquaredJobConf2.get(TEST_PROPERTY_KEY));
+ assertEquals(TEST_PROPERTY_VALUE,
customTimesSquaredJobConf2.get(TEST_PROPERTY_KEY));
+
+ Configuration timesSquaredJobConf3 =
TimesSquaredJob.createTimesSquaredJobConf(v,
+
50,
+
inputPath,
+
outputPath,
+
TimesSquaredJob.TimesSquaredMapper.class,
+
TimesSquaredJob.VectorSummingReducer.class);
+ Configuration customTimesSquaredJobConf3 =
TimesSquaredJob.createTimesSquaredJobConf(initialConf,
+
v,
+
50,
+
inputPath,
+
outputPath,
+
TimesSquaredJob.TimesSquaredMapper.class,
+
TimesSquaredJob.VectorSummingReducer.class);
+
+ assertNull(timesSquaredJobConf3.get(TEST_PROPERTY_KEY));
+ assertEquals(TEST_PROPERTY_VALUE,
customTimesSquaredJobConf3.get(TEST_PROPERTY_KEY));
+ }
+
+ public Configuration createInitialConf() {
+ Configuration initialConf = new Configuration();
+ initialConf.set(TEST_PROPERTY_KEY, TEST_PROPERTY_VALUE);
+ return initialConf;
+ }
+
public DistributedRowMatrix randomDistributedMatrix(int numRows,
int nonNullRows,
int numCols,