Author: dlyubimov
Date: Wed Dec 21 04:08:04 2011
New Revision: 1221603
URL: http://svn.apache.org/viewvc?rev=1221603&view=rev
Log:
MAHOUT-922-2: SSVD broadcast option for B', R-hat matrices; making
--reduce-tasks non-optional in SSVD CLI
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java?rev=1221603&r1=1221602&r2=1221603&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
Wed Dec 21 04:08:04 2011
@@ -30,6 +30,7 @@ import com.google.common.collect.Iterato
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Writable;
@@ -45,9 +46,49 @@ import org.apache.mahout.common.Pair;
public final class SequenceFileDirIterator<K extends Writable,V extends
Writable>
extends ForwardingIterator<Pair<K,V>> implements Closeable {
- private final Iterator<Pair<K,V>> delegate;
+ private Iterator<Pair<K,V>> delegate;
private final List<SequenceFileIterator<K,V>> iterators;
+ /**
+ * Multifile sequence file iterator where files are specified explicitly by
+ * path parameters.
+ *
+ * @param path
+ * @param ordering
+ * @param reuseKeyValueInstances
+ * @param conf
+ * @throws IOException
+ */
+ public SequenceFileDirIterator(Path[] path,
+ Comparator<FileStatus> ordering,
+ final boolean reuseKeyValueInstances,
+ final Configuration conf) throws
IOException {
+
+ iterators = Lists.newArrayList();
+ /*
+ * we assume all files should exist, otherwise we will bail out.
+ */
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] statuses = new FileStatus[path.length];
+ for (int i = 0; i < statuses.length; i++)
+ statuses[i] = fs.getFileStatus(path[i]);
+ init(statuses, ordering, reuseKeyValueInstances, conf);
+ }
+
+ /**
+ * Constructor that uses either {@link FileSystem#listStatus(Path)} or
+ * {@link FileSystem#globStatus(Path)} to obtain list of files to iterate
over
+ * (depending on pathType parameter).
+ * <P>
+ *
+ * @param path
+ * @param pathType
+ * @param filter
+ * @param ordering
+ * @param reuseKeyValueInstances
+ * @param conf
+ * @throws IOException
+ */
public SequenceFileDirIterator(Path path,
PathType pathType,
PathFilter filter,
@@ -55,27 +96,47 @@ public final class SequenceFileDirIterat
final boolean reuseKeyValueInstances,
final Configuration conf) throws IOException {
-
- FileStatus[] statuses = HadoopUtil.getFileStatus(path, pathType, filter,
ordering, conf);
- Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses);
-
+ FileStatus[] statuses =
+ HadoopUtil.getFileStatus(path, pathType, filter, ordering, conf);
iterators = Lists.newArrayList();
+ init(statuses, ordering, reuseKeyValueInstances, conf);
+ }
- Iterator<Iterator<Pair<K,V>>> fsIterators =
- Iterators.transform(fileStatusIterator,
- new Function<FileStatus, Iterator<Pair<K, V>>>() {
- @Override
- public Iterator<Pair<K, V>> apply(FileStatus
from) {
- try {
- SequenceFileIterator<K,V> iterator =
- new
SequenceFileIterator<K,V>(from.getPath(), reuseKeyValueInstances, conf);
- iterators.add(iterator);
- return iterator;
- } catch (IOException ioe) {
- throw new
IllegalStateException(from.getPath().toString(), ioe);
- }
+ private void init(FileStatus[] statuses,
+ Comparator<FileStatus> ordering,
+ final boolean reuseKeyValueInstances,
+ final Configuration conf) throws IOException {
+
+ /*
+ * prevent NPEs. Unfortunately, Hadoop would return null for list if
nothing
+ * was qualified. In this case, which is a corner case, we should assume an
+ * empty iterator, not an NPE.
+ */
+ if (statuses == null)
+ statuses = new FileStatus[0];
+
+ Iterator<FileStatus> fileStatusIterator =
+ Iterators.forArray(statuses == null ? new FileStatus[0] : statuses);
+
+ Iterator<Iterator<Pair<K, V>>> fsIterators =
+ Iterators.transform(fileStatusIterator,
+ new Function<FileStatus, Iterator<Pair<K, V>>>() {
+ @Override
+ public Iterator<Pair<K, V>> apply(FileStatus from)
{
+ try {
+ SequenceFileIterator<K, V> iterator =
+ new SequenceFileIterator<K,
V>(from.getPath(),
+
reuseKeyValueInstances,
+ conf);
+ iterators.add(iterator);
+ return iterator;
+ } catch (IOException ioe) {
+ throw new IllegalStateException(from.getPath()
+
.toString(),
+ ioe);
}
- });
+ }
+ });
Collections.reverse(iterators); // close later in reverse order
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java?rev=1221603&r1=1221602&r2=1221603&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
Wed Dec 21 04:08:04 2011
@@ -38,16 +38,31 @@ import org.apache.hadoop.io.Writable;
import org.apache.mahout.common.IOUtils;
/**
- * Like {@link SequenceFileValueIterator}, but iterates not just over one
sequence file, but many. The input path
- * may be specified as a directory of files to read, or as a glob pattern. The
set of files may be optionally
+ * Like {@link SequenceFileValueIterator}, but iterates not just over one
+ * sequence file, but many. The input path may be specified as a directory of
+ * files to read, or as a glob pattern. The set of files may be optionally
* restricted with a {@link PathFilter}.
*/
-public final class SequenceFileDirValueIterator<V extends Writable>
- extends ForwardingIterator<V> implements Closeable {
+public final class SequenceFileDirValueIterator<V extends Writable> extends
+ ForwardingIterator<V> implements Closeable {
- private final Iterator<V> delegate;
+ private Iterator<V> delegate;
private final List<SequenceFileValueIterator<V>> iterators;
+ /**
+ * Constructor that uses either {@link FileSystem#listStatus(Path)} or
+ * {@link FileSystem#globStatus(Path)} to obtain list of files to iterate
over
+ * (depending on pathType parameter).
+ * <P>
+ *
+ * @param path
+ * @param pathType
+ * @param filter
+ * @param ordering
+ * @param reuseKeyValueInstances
+ * @param conf
+ * @throws IOException
+ */
public SequenceFileDirValueIterator(Path path,
PathType pathType,
PathFilter filter,
@@ -55,38 +70,103 @@ public final class SequenceFileDirValueI
final boolean reuseKeyValueInstances,
final Configuration conf) throws
IOException {
FileStatus[] statuses;
- FileSystem fs = path.getFileSystem(conf);
+ FileSystem fs = FileSystem.get(conf);
if (filter == null) {
- statuses = pathType == PathType.GLOB ? fs.globStatus(path) :
fs.listStatus(path);
+ statuses =
+ pathType == PathType.GLOB ? fs.globStatus(path) : fs.listStatus(path);
} else {
- statuses = pathType == PathType.GLOB ? fs.globStatus(path, filter) :
fs.listStatus(path, filter);
+ statuses =
+ pathType == PathType.GLOB ? fs.globStatus(path, filter)
+ : fs.listStatus(path, filter);
}
+ iterators = Lists.newArrayList();
+ init(statuses, ordering, reuseKeyValueInstances, conf);
+ }
+
+ /**
+ * Multifile sequence file iterator where files are specified explicitly by
+ * path parameters.
+ *
+ * @param path
+ * @param ordering
+ * @param reuseKeyValueInstances
+ * @param conf
+ * @throws IOException
+ */
+ public SequenceFileDirValueIterator(Path[] path,
+ Comparator<FileStatus> ordering,
+ final boolean reuseKeyValueInstances,
+ final Configuration conf) throws
IOException {
+
+ iterators = Lists.newArrayList();
+ /*
+ * we assume all files should exist, otherwise we will bail out.
+ */
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] statuses = new FileStatus[path.length];
+ for (int i = 0; i < statuses.length; i++)
+ statuses[i] = fs.getFileStatus(path[i]);
+ init(statuses, ordering, reuseKeyValueInstances, conf);
+ }
+
+ private void init(FileStatus[] statuses,
+ Comparator<FileStatus> ordering,
+ final boolean reuseKeyValueInstances,
+ final Configuration conf) throws IOException {
+
+ /*
+ * prevent NPEs. Unfortunately, Hadoop would return null for list if
nothing
+ * was qualified. In this case, which is a corner case, we should assume an
+ * empty iterator, not an NPE.
+ */
+ if (statuses == null)
+ statuses = new FileStatus[0];
+
if (ordering != null) {
Arrays.sort(statuses, ordering);
}
Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses);
- iterators = Lists.newArrayList();
+ boolean ok = false;
- Iterator<Iterator<V>> fsIterators =
+ try {
+
+ Iterator<Iterator<V>> fsIterators =
Iterators.transform(fileStatusIterator,
new Function<FileStatus, Iterator<V>>() {
@Override
public Iterator<V> apply(FileStatus from) {
try {
SequenceFileValueIterator<V> iterator =
- new
SequenceFileValueIterator<V>(from.getPath(), reuseKeyValueInstances, conf);
+ new
SequenceFileValueIterator<V>(from.getPath(),
+
reuseKeyValueInstances,
+ conf);
iterators.add(iterator);
return iterator;
} catch (IOException ioe) {
- throw new
IllegalStateException(from.getPath().toString(), ioe);
+ throw new
IllegalStateException(from.getPath()
+
.toString(),
+ ioe);
}
}
});
- Collections.reverse(iterators); // close later in reverse order
+ Collections.reverse(iterators); // close later in reverse order
+
+ delegate = Iterators.concat(fsIterators);
+ ok = true;
- delegate = Iterators.concat(fsIterators);
+ } finally {
+ /*
+ * prevent file handle leaks in case one of handles fails to open. If
some
+ * of the files fail to open, constructor will fail and close() will
never
+ * be called. Thus, those handles that did open in constructor, would
leak
+ * out, unless we specifically handle it here.
+ */
+
+ if (!ok)
+ IOUtils.close(iterators);
+ }
}
@Override
@@ -97,7 +177,6 @@ public final class SequenceFileDirValueI
@Override
public void close() throws IOException {
IOUtils.close(iterators);
- iterators.clear();
}
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java?rev=1221603&r1=1221602&r2=1221603&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java
Wed Dec 21 04:08:04 2011
@@ -29,6 +29,8 @@ import java.util.regex.Matcher;
import org.apache.commons.lang.Validate;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
@@ -63,6 +65,7 @@ import org.apache.mahout.math.hadoop.sto
public class ABtDenseOutJob {
public static final String PROP_BT_PATH = "ssvd.Bt.path";
+ public static final String PROP_BT_BROADCAST = "ssvd.Bt.broadcast";
private ABtDenseOutJob() {
}
@@ -94,6 +97,9 @@ public class ABtDenseOutJob {
private int aRowCount;
private int kp;
private int blockHeight;
+ private boolean distributedBt;
+ private Path[] btLocalPath;
+ private Configuration localFsConfig;
@Override
protected void map(Writable key, VectorWritable value, Context context)
@@ -114,8 +120,7 @@ public class ABtDenseOutJob {
aCols[i].setQuick(aRowCount, vec.getQuick(i));
}
} else if (vec.size() > 0) {
- for (Iterator<Vector.Element> vecIter = vec.iterateNonZero(); vecIter
- .hasNext();) {
+ for (Iterator<Vector.Element> vecIter = vec.iterateNonZero();
vecIter.hasNext();) {
Vector.Element vecEl = vecIter.next();
int i = vecEl.index();
extendAColIfNeeded(i, aRowCount + 1);
@@ -133,8 +138,7 @@ public class ABtDenseOutJob {
} else if (aCols[col].size() < rowCount) {
Vector newVec =
new SequentialAccessSparseVector(rowCount + blockHeight,
- aCols[col]
- .getNumNondefaultElements() << 1);
+
aCols[col].getNumNondefaultElements() << 1);
newVec.viewPart(0, aCols[col].size()).assign(aCols[col]);
aCols[col] = newVec;
}
@@ -176,15 +180,26 @@ public class ABtDenseOutJob {
*/
for (int pass = 0; pass < numPasses; pass++) {
- btInput =
- new SequenceFileDirIterator<IntWritable, VectorWritable>(btPath,
-
PathType.GLOB,
- null,
- null,
- true,
- context
-
.getConfiguration());
+ if (distributedBt) {
+
+ btInput =
+ new SequenceFileDirIterator<IntWritable,
VectorWritable>(btLocalPath,
+ null,
+ true,
+
localFsConfig);
+
+ } else {
+
+ btInput =
+ new SequenceFileDirIterator<IntWritable, VectorWritable>(btPath,
+
PathType.GLOB,
+ null,
+ null,
+ true,
+
context.getConfiguration());
+ }
closeables.addFirst(btInput);
+ Validate.isTrue(btInput.hasNext(), "Empty B' input!");
int aRowBegin = pass * blockHeight;
int bh = Math.min(blockHeight, aRowCount - aRowBegin);
@@ -217,8 +232,7 @@ public class ABtDenseOutJob {
continue;
}
int j = -1;
- for (Iterator<Vector.Element> aColIter = aCol.iterateNonZero();
aColIter
- .hasNext();) {
+ for (Iterator<Vector.Element> aColIter = aCol.iterateNonZero();
aColIter.hasNext();) {
Vector.Element aEl = aColIter.next();
j = aEl.index();
@@ -282,6 +296,15 @@ public class ABtDenseOutJob {
blockHeight =
context.getConfiguration().getInt(BtJob.PROP_OUTER_PROD_BLOCK_HEIGHT,
-1);
+ distributedBt = context.getConfiguration().get(PROP_BT_BROADCAST) !=
null;
+ if (distributedBt) {
+
+ btLocalPath =
+ DistributedCache.getLocalCacheFiles(context.getConfiguration());
+
+ localFsConfig = new Configuration();
+ localFsConfig.set("fs.default.name", "file:///");
+ }
}
}
@@ -303,8 +326,8 @@ public class ABtDenseOutJob {
* management completely and bypass MultipleOutputs entirely.
*/
- private static final NumberFormat NUMBER_FORMAT = NumberFormat
- .getInstance();
+ private static final NumberFormat NUMBER_FORMAT =
+ NumberFormat.getInstance();
static {
NUMBER_FORMAT.setMinimumIntegerDigits(5);
NUMBER_FORMAT.setGroupingUsed(false);
@@ -393,8 +416,8 @@ public class ABtDenseOutJob {
String uniqueFileName = FileOutputFormat.getUniqueFile(context, name,
"");
uniqueFileName = uniqueFileName.replaceFirst("-r-", "-m-");
uniqueFileName =
- uniqueFileName.replaceFirst("\\d+$", Matcher
- .quoteReplacement(NUMBER_FORMAT.format(spw.getTaskId())));
+ uniqueFileName.replaceFirst("\\d+$",
+
Matcher.quoteReplacement(NUMBER_FORMAT.format(spw.getTaskId())));
return new Path(FileOutputFormat.getWorkOutputPath(context),
uniqueFileName);
}
@@ -454,8 +477,9 @@ public class ABtDenseOutJob {
int k,
int p,
int outerProdBlockHeight,
- int numReduceTasks) throws ClassNotFoundException,
- InterruptedException, IOException {
+ int numReduceTasks,
+ boolean broadcastBInput)
+ throws ClassNotFoundException, InterruptedException, IOException {
JobConf oldApiJob = new JobConf(conf);
@@ -492,6 +516,24 @@ public class ABtDenseOutJob {
job.setNumReduceTasks(numReduceTasks);
+ // broadcast Bt files if required.
+ if (broadcastBInput) {
+ job.getConfiguration().set(PROP_BT_BROADCAST, "y");
+
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] fstats = fs.globStatus(inputBtGlob);
+ if (fstats != null) {
+ for (FileStatus fstat : fstats) {
+ /*
+ * new api is not enabled yet in our dependencies at this time, still
+ * using deprecated one
+ */
+ DistributedCache.addCacheFile(fstat.getPath().toUri(),
+ job.getConfiguration());
+ }
+ }
+ }
+
job.submit();
job.waitForCompletion(false);
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java?rev=1221603&r1=1221602&r2=1221603&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java
Wed Dec 21 04:08:04 2011
@@ -29,6 +29,8 @@ import java.util.regex.Matcher;
import org.apache.commons.lang.Validate;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
@@ -63,6 +65,7 @@ import org.apache.mahout.math.hadoop.sto
public class ABtJob {
public static final String PROP_BT_PATH = "ssvd.Bt.path";
+ public static final String PROP_BT_BROADCAST = "ssvd.Bt.broadcast";
private ABtJob() {
}
@@ -116,8 +119,7 @@ public class ABtJob {
aCols[i].setQuick(aRowCount, vec.getQuick(i));
}
} else {
- for (Iterator<Vector.Element> vecIter = vec.iterateNonZero(); vecIter
- .hasNext();) {
+ for (Iterator<Vector.Element> vecIter = vec.iterateNonZero();
vecIter.hasNext();) {
Vector.Element vecEl = vecIter.next();
int i = vecEl.index();
extendAColIfNeeded(i, aRowCount + 1);
@@ -130,13 +132,12 @@ public class ABtJob {
private void extendAColIfNeeded(int col, int rowCount) {
if (aCols[col] == null) {
aCols[col] =
- new SequentialAccessSparseVector(rowCount < 10000 ? 10000 :
rowCount,
- 16);
+ new SequentialAccessSparseVector(rowCount < 10000 ? 10000 : rowCount,
+ 1);
} else if (aCols[col].size() < rowCount) {
Vector newVec =
new SequentialAccessSparseVector(rowCount << 1,
- aCols[col]
- .getNumNondefaultElements() << 1);
+
aCols[col].getNumNondefaultElements() << 1);
newVec.viewPart(0, aCols[col].size()).assign(aCols[col]);
aCols[col] = newVec;
}
@@ -159,8 +160,7 @@ public class ABtJob {
continue;
}
int j = -1;
- for (Iterator<Vector.Element> aColIter = aCol.iterateNonZero();
aColIter
- .hasNext(); ) {
+ for (Iterator<Vector.Element> aColIter = aCol.iterateNonZero();
aColIter.hasNext();) {
Vector.Element aEl = aColIter.next();
j = aEl.index();
@@ -180,8 +180,7 @@ public class ABtJob {
// this happens in sparse matrices when last rows are all zeros
// and is subsequently causing shorter Q matrix row count which we
// probably don't want to repair there but rather here.
- Vector yDummy =
- new SequentialAccessSparseVector(kp);
+ Vector yDummy = new SequentialAccessSparseVector(kp);
// outValue.set(yDummy);
for (lastRowIndex += 1; lastRowIndex < aRowCount; lastRowIndex++) {
// outKey.setTaskItemOrdinal(lastRowIndex);
@@ -210,14 +209,42 @@ public class ABtJob {
Validate.notNull(propBtPathStr, "Bt input is not set");
Path btPath = new Path(propBtPathStr);
- btInput =
- new SequenceFileDirIterator<IntWritable, VectorWritable>(btPath,
- PathType.GLOB,
- null,
- null,
- true,
- context
-
.getConfiguration());
+ boolean distributedBt =
+ context.getConfiguration().get(PROP_BT_BROADCAST) != null;
+
+ if (distributedBt) {
+
+ Path[] btFiles =
+ DistributedCache.getLocalCacheFiles(context.getConfiguration());
+
+ // DEBUG: stdout
+ System.out.printf("list of files: " + btFiles);
+
+ String btLocalPath = "";
+ for (Path btFile : btFiles) {
+ if (btLocalPath.length() > 0)
+ btLocalPath += Path.SEPARATOR_CHAR;
+ btLocalPath += btFile;
+ }
+
+ btInput =
+ new SequenceFileDirIterator<IntWritable, VectorWritable>(new
Path(btLocalPath),
+
PathType.LIST,
+ null,
+ null,
+ true,
+
context.getConfiguration());
+
+ } else {
+
+ btInput =
+ new SequenceFileDirIterator<IntWritable, VectorWritable>(btPath,
+
PathType.GLOB,
+ null,
+ null,
+ true,
+
context.getConfiguration());
+ }
// TODO: how do i release all that stuff??
closeables.addFirst(btInput);
OutputCollector<LongWritable, SparseRowBlockWritable> yiBlockCollector =
@@ -261,8 +288,8 @@ public class ABtJob {
// management
// completely and bypass MultipleOutputs entirely.
- private static final NumberFormat NUMBER_FORMAT = NumberFormat
- .getInstance();
+ private static final NumberFormat NUMBER_FORMAT =
+ NumberFormat.getInstance();
static {
NUMBER_FORMAT.setMinimumIntegerDigits(5);
NUMBER_FORMAT.setGroupingUsed(false);
@@ -342,8 +369,8 @@ public class ABtJob {
String uniqueFileName = FileOutputFormat.getUniqueFile(context, name,
"");
uniqueFileName = uniqueFileName.replaceFirst("-r-", "-m-");
uniqueFileName =
- uniqueFileName.replaceFirst("\\d+$", Matcher
- .quoteReplacement(NUMBER_FORMAT.format(spw.getTaskId())));
+ uniqueFileName.replaceFirst("\\d+$",
+
Matcher.quoteReplacement(NUMBER_FORMAT.format(spw.getTaskId())));
return new Path(FileOutputFormat.getWorkOutputPath(context),
uniqueFileName);
}
@@ -403,8 +430,9 @@ public class ABtJob {
int k,
int p,
int outerProdBlockHeight,
- int numReduceTasks) throws ClassNotFoundException,
- InterruptedException, IOException {
+ int numReduceTasks,
+ boolean broadcastBInput)
+ throws ClassNotFoundException, InterruptedException, IOException {
JobConf oldApiJob = new JobConf(conf);
@@ -459,6 +487,23 @@ public class ABtJob {
job.setNumReduceTasks(numReduceTasks);
+ // broadcast Bt files if required.
+ if (broadcastBInput) {
+ job.getConfiguration().set(PROP_BT_BROADCAST, "y");
+
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] fstats = fs.globStatus(inputBtGlob);
+ if (fstats != null) {
+ for (FileStatus fstat : fstats) {
+ /*
+ * new api is not enabled yet in our dependencies at this time, still
+ * using deprecated one
+ */
+ DistributedCache.addCacheFile(fstat.getPath().toUri(), conf);
+ }
+ }
+ }
+
job.submit();
job.waitForCompletion(false);
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java?rev=1221603&r1=1221602&r2=1221603&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
Wed Dec 21 04:08:04 2011
@@ -25,6 +25,9 @@ import java.util.Iterator;
import org.apache.commons.lang.Validate;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
@@ -82,6 +85,7 @@ public final class BtJob {
"ssvd.BtJob.outputBBtProducts";
public static final String PROP_OUTER_PROD_BLOCK_HEIGHT =
"ssvd.outerProdBlockHeight";
+ public static final String PROP_RHAT_BROADCAST = "ssvd.rhat.broadcast";
static final double SPARSE_ZEROS_PCT_THRESHOLD = 0.1;
@@ -137,8 +141,7 @@ public final class BtJob {
}
if (!aRow.isDense()) {
- for (Iterator<Vector.Element> iter = aRow.iterateNonZero(); iter
- .hasNext();) {
+ for (Iterator<Vector.Element> iter = aRow.iterateNonZero();
iter.hasNext();) {
Vector.Element el = iter.next();
double mul = el.get();
for (int j = 0; j < kp; j++) {
@@ -179,25 +182,51 @@ public final class BtJob {
SequenceFileValueIterator<DenseBlockWritable> qhatInput =
new SequenceFileValueIterator<DenseBlockWritable>(qInputPath,
true,
- context
-
.getConfiguration());
+
context.getConfiguration());
closeables.addFirst(qhatInput);
/*
* read all r files _in order of task ids_, i.e. partitions (aka group
- * nums)
+ * nums).
+ *
+ * Note: if broadcast option is used, this comes from distributed cache
+ * files rather than hdfs path.
*/
- Path rPath = new Path(qJobPath, QJob.OUTPUT_RHAT + "-*");
+ SequenceFileDirValueIterator<VectorWritable> rhatInput;
+
+ boolean distributedRHat =
+ context.getConfiguration().get(PROP_RHAT_BROADCAST) != null;
+ if (distributedRHat) {
+
+ Path[] rFiles =
+ DistributedCache.getLocalCacheFiles(context.getConfiguration());
+
+ Validate.notNull(rFiles,
+ "no RHat files in distributed cache job definition");
+
+ Configuration conf = new Configuration();
+ conf.set("fs.default.name", "file:///");
+
+ rhatInput =
+ new SequenceFileDirValueIterator<VectorWritable>(rFiles,
+
SSVDSolver.PARTITION_COMPARATOR,
+ true,
+ conf);
+
+ } else {
+ Path rPath = new Path(qJobPath, QJob.OUTPUT_RHAT + "-*");
+ rhatInput =
+ new SequenceFileDirValueIterator<VectorWritable>(rPath,
+ PathType.GLOB,
+ null,
+
SSVDSolver.PARTITION_COMPARATOR,
+ true,
+
context.getConfiguration());
+ }
+
+ Validate.isTrue(rhatInput.hasNext(), "Empty R-hat input!");
- SequenceFileDirValueIterator<VectorWritable> rhatInput =
- new SequenceFileDirValueIterator<VectorWritable>(rPath,
- PathType.GLOB,
- null,
-
SSVDSolver.PARTITION_COMPARATOR,
- true,
- context
-
.getConfiguration());
closeables.addFirst(rhatInput);
outputs = new MultipleOutputs(new JobConf(context.getConfiguration()));
closeables.addFirst(new
IOUtils.MultipleOutputsCloseableAdapter(outputs));
@@ -230,7 +259,9 @@ public final class BtJob {
btCollector =
new SparseRowBlockAccumulator(context.getConfiguration()
- .getInt(PROP_OUTER_PROD_BLOCK_HEIGHT, -1), btBlockCollector);
+
.getInt(PROP_OUTER_PROD_BLOCK_HEIGHT,
+ -1),
+ btBlockCollector);
closeables.addFirst(btCollector);
}
@@ -304,8 +335,7 @@ public final class BtJob {
mBBt = new UpperTriangular(k + p);
outputs = new MultipleOutputs(new JobConf(context.getConfiguration()));
- closeables
- .addFirst(new IOUtils.MultipleOutputsCloseableAdapter(outputs));
+ closeables.addFirst(new
IOUtils.MultipleOutputsCloseableAdapter(outputs));
}
}
@@ -363,9 +393,8 @@ public final class BtJob {
OutputCollector<Writable, Writable> collector =
outputs.getCollector(OUTPUT_BBT, null);
- collector
- .collect(new IntWritable(),
- new VectorWritable(new DenseVector(mBBt.getData())));
+ collector.collect(new IntWritable(),
+ new VectorWritable(new
DenseVector(mBBt.getData())));
}
} finally {
IOUtils.close(closeables);
@@ -384,26 +413,25 @@ public final class BtJob {
int p,
int btBlockHeight,
int numReduceTasks,
+ boolean broadcast,
Class<? extends Writable> labelClass,
boolean outputBBtProducts)
throws ClassNotFoundException, InterruptedException, IOException {
JobConf oldApiJob = new JobConf(conf);
- MultipleOutputs
- .addNamedOutput(oldApiJob,
- OUTPUT_Q,
- org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
- labelClass,
- VectorWritable.class);
+ MultipleOutputs.addNamedOutput(oldApiJob,
+ OUTPUT_Q,
+
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+ labelClass,
+ VectorWritable.class);
if (outputBBtProducts) {
- MultipleOutputs
- .addNamedOutput(oldApiJob,
- OUTPUT_BBT,
-
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
- IntWritable.class,
- VectorWritable.class);
+ MultipleOutputs.addNamedOutput(oldApiJob,
+ OUTPUT_BBT,
+
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+ IntWritable.class,
+ VectorWritable.class);
}
/*
@@ -450,6 +478,30 @@ public final class BtJob {
job.setNumReduceTasks(numReduceTasks);
+ /*
+ * we can broadhast Rhat files since all of them are reuqired by each job,
+ * but not Q files which correspond to splits of A (so each split of A will
+ * require only particular Q file, each time different one).
+ */
+
+ if (broadcast) {
+ job.getConfiguration().set(PROP_RHAT_BROADCAST, "y");
+
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] fstats =
+ fs.globStatus(new Path(inputPathQJob, QJob.OUTPUT_RHAT + "-*"));
+ if (fstats != null) {
+ for (FileStatus fstat : fstats) {
+ /*
+ * new api is not enabled yet in our dependencies at this time, still
+ * using deprecated one
+ */
+ DistributedCache.addCacheFile(fstat.getPath().toUri(),
+ job.getConfiguration());
+ }
+ }
+ }
+
job.submit();
job.waitForCompletion(false);
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java?rev=1221603&r1=1221602&r2=1221603&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
Wed Dec 21 04:08:04 2011
@@ -45,12 +45,15 @@ public class SSVDCli extends AbstractJob
addOutputOption();
addOption("rank", "k", "decomposition rank", true);
addOption("oversampling", "p", "oversampling", String.valueOf(15));
- addOption("blockHeight", "r", "Y block height (must be > (k+p))",
String.valueOf(10000));
+ addOption("blockHeight",
+ "r",
+ "Y block height (must be > (k+p))",
+ String.valueOf(10000));
addOption("outerProdBlockHeight",
"oh",
"block height of outer products during multiplication, increase
for sparse inputs",
String.valueOf(30000));
- addOption("abtBlockHeight",
+ addOption("abtBlockHeight",
"abth",
"block height of Y_i in ABtJob during AB' multiplication,
increase for extremely sparse inputs",
String.valueOf(200000));
@@ -68,11 +71,15 @@ public class SSVDCli extends AbstractJob
addOption("reduceTasks",
"t",
"number of reduce tasks (where applicable)",
- String.valueOf(1));
+ true);
addOption("powerIter",
"q",
"number of additional power iterations (0..2 is good)",
String.valueOf(0));
+ addOption("broadcast",
+ "br",
+ "whether use distributed cache to broadcast matrices wherever
possible",
+ String.valueOf(true));
addOption(DefaultOptionCreator.overwriteOption().create());
Map<String, String> pargs = parseArguments(args);
@@ -92,6 +99,7 @@ public class SSVDCli extends AbstractJob
boolean cUHalfSigma = Boolean.parseBoolean(pargs.get("--uHalfSigma"));
boolean cVHalfSigma = Boolean.parseBoolean(pargs.get("--vHalfSigma"));
int reduceTasks = Integer.parseInt(pargs.get("--reduceTasks"));
+ boolean broadcast = Boolean.parseBoolean(pargs.get("--broadcast"));
boolean overwrite =
pargs.containsKey(keyFor(DefaultOptionCreator.OVERWRITE_OPTION));
@@ -116,6 +124,7 @@ public class SSVDCli extends AbstractJob
solver.setOuterBlockHeight(h);
solver.setAbtBlockHeight(abh);
solver.setQ(q);
+ solver.setBroadcast(broadcast);
solver.setOverwrite(overwrite);
solver.run();
@@ -128,14 +137,16 @@ public class SSVDCli extends AbstractJob
SequenceFile.Writer sigmaW = null;
try {
- sigmaW = SequenceFile.createWriter(fs,
- conf,
- getOutputPath("sigma"),
- NullWritable.class,
- VectorWritable.class);
+ sigmaW =
+ SequenceFile.createWriter(fs,
+ conf,
+ getOutputPath("sigma"),
+ NullWritable.class,
+ VectorWritable.class);
Writable sValues =
- new VectorWritable(new DenseVector(Arrays.copyOf(solver
- .getSingularValues(), k), true));
+ new VectorWritable(new
DenseVector(Arrays.copyOf(solver.getSingularValues(),
+ k),
+ true));
sigmaW.append(NullWritable.get(), sValues);
} finally {
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java?rev=1221603&r1=1221602&r2=1221603&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
Wed Dec 21 04:08:04 2011
@@ -117,6 +117,7 @@ public class SSVDSolver {
private boolean cUHalfSigma;
private boolean cVHalfSigma;
private boolean overwrite;
+ private boolean broadcast = true;
/**
* create new SSVD solver. Required parameters are passed to constructor to
@@ -284,6 +285,20 @@ public class SSVDSolver {
this.abtBlockHeight = abtBlockHeight;
}
+ public boolean isBroadcast() {
+ return broadcast;
+ }
+
+ /**
+ * If this property is true, use DestributedCache mechanism to broadcast some
+ * stuff around. May improve efficiency. Default is false.
+ *
+ * @param broadcast
+ */
+ public void setBroadcast(boolean broadcast) {
+ this.broadcast = broadcast;
+ }
+
/**
* run all SSVD jobs.
*
@@ -292,7 +307,7 @@ public class SSVDSolver {
*/
public void run() throws IOException {
- Deque<Closeable> closeables = Lists.<Closeable>newLinkedList();
+ Deque<Closeable> closeables = Lists.<Closeable> newLinkedList();
try {
Class<? extends Writable> labelType =
sniffInputLabelType(inputPath, conf);
@@ -322,8 +337,12 @@ public class SSVDSolver {
seed,
reduceTasks);
- // restrict number of reducers to a reasonable number
- // so we don't have to run too many additions in the frontend.
+ /*
+ * restrict number of reducers to a reasonable number so we don't have to
+ * run too many additions in the frontend when reconstructing BBt for the
+ * last B' and BB' computations. The user may not realize that and gives
a
+ * bit too many (I would be happy i that were ever the case though).
+ */
BtJob.run(conf,
inputPath,
@@ -333,7 +352,8 @@ public class SSVDSolver {
k,
p,
outerBlockHeight,
- Math.min(1000, reduceTasks),
+ q <= 0 ? Math.min(1000, reduceTasks) : reduceTasks,
+ broadcast,
labelType,
q <= 0);
@@ -351,7 +371,8 @@ public class SSVDSolver {
k,
p,
abtBlockHeight,
- reduceTasks);
+ reduceTasks,
+ broadcast);
btPath = new Path(outputPath, String.format("Bt-job-%d", i + 1));
@@ -363,7 +384,8 @@ public class SSVDSolver {
k,
p,
outerBlockHeight,
- Math.min(1000, reduceTasks),
+ i == q - 1 ? Math.min(1000, reduceTasks) : reduceTasks,
+ broadcast,
labelType,
i == q - 1);
}
@@ -499,7 +521,8 @@ public class SSVDSolver {
if (!fstats[0].isDir()) {
firstSeqFile = fstats[0];
} else {
- firstSeqFile = fs.listStatus(fstats[0].getPath(),
PathFilters.logsCRCFilter())[0];
+ firstSeqFile =
+ fs.listStatus(fstats[0].getPath(), PathFilters.logsCRCFilter())[0];
}
SequenceFile.Reader r = null;
@@ -566,7 +589,6 @@ public class SSVDSolver {
List<double[]> denseData = Lists.newArrayList();
-
/*
* assume it is partitioned output, so we need to read them up in order of
* partitions.
Modified:
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java?rev=1221603&r1=1221602&r2=1221603&view=diff
==============================================================================
---
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java
(original)
+++
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java
Wed Dec 21 04:08:04 2011
@@ -132,6 +132,7 @@ public class LocalSSVDSolverDenseTest ex
ssvd.setAbtBlockHeight(400);
ssvd.setOverwrite(true);
ssvd.setQ(q);
+ ssvd.setBroadcast(false);
ssvd.run();
double[] stochasticSValues = ssvd.getSingularValues();
Modified:
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java?rev=1221603&r1=1221602&r2=1221603&view=diff
==============================================================================
---
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java
(original)
+++
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java
Wed Dec 21 04:08:04 2011
@@ -152,6 +152,7 @@ public class LocalSSVDSolverSparseSequen
ssvd.setOverwrite(true);
ssvd.setQ(q);
+ ssvd.setBroadcast(true);
ssvd.run();
double[] stochasticSValues = ssvd.getSingularValues();