Author: dlyubimov
Date: Wed Dec 21 04:08:04 2011
New Revision: 1221603

URL: http://svn.apache.org/viewvc?rev=1221603&view=rev
Log:
MAHOUT-922-2: SSVD broadcast option for B', R-hat matrices; making 
--reduce-tasks non-optional in SSVD CLI

Modified:
    
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
    
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java
    
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java?rev=1221603&r1=1221602&r2=1221603&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
 Wed Dec 21 04:08:04 2011
@@ -30,6 +30,7 @@ import com.google.common.collect.Iterato
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Writable;
@@ -45,9 +46,49 @@ import org.apache.mahout.common.Pair;
 public final class SequenceFileDirIterator<K extends Writable,V extends 
Writable>
     extends ForwardingIterator<Pair<K,V>> implements Closeable {
 
-  private final Iterator<Pair<K,V>> delegate;
+  private Iterator<Pair<K,V>> delegate;
   private final List<SequenceFileIterator<K,V>> iterators;
 
+  /**
+   * Multifile sequence file iterator where files are specified explicitly by
+   * path parameters.
+   * 
+   * @param path
+   * @param ordering
+   * @param reuseKeyValueInstances
+   * @param conf
+   * @throws IOException
+   */
+  public SequenceFileDirIterator(Path[] path,
+                                      Comparator<FileStatus> ordering,
+                                      final boolean reuseKeyValueInstances,
+                                      final Configuration conf) throws 
IOException {
+
+    iterators = Lists.newArrayList();
+    /*
+     * we assume all files should exist, otherwise we will bail out.
+     */
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] statuses = new FileStatus[path.length];
+    for (int i = 0; i < statuses.length; i++)
+      statuses[i] = fs.getFileStatus(path[i]);
+    init(statuses, ordering, reuseKeyValueInstances, conf);
+  }
+
+  /**
+   * Constructor that uses either {@link FileSystem#listStatus(Path)} or
+   * {@link FileSystem#globStatus(Path)} to obtain list of files to iterate 
over
+   * (depending on pathType parameter).
+   * <P>
+   * 
+   * @param path
+   * @param pathType
+   * @param filter
+   * @param ordering
+   * @param reuseKeyValueInstances
+   * @param conf
+   * @throws IOException
+   */
   public SequenceFileDirIterator(Path path,
                                  PathType pathType,
                                  PathFilter filter,
@@ -55,27 +96,47 @@ public final class SequenceFileDirIterat
                                  final boolean reuseKeyValueInstances,
                                  final Configuration conf) throws IOException {
 
-
-    FileStatus[] statuses = HadoopUtil.getFileStatus(path, pathType, filter, 
ordering, conf);
-    Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses);
-
+    FileStatus[] statuses =
+      HadoopUtil.getFileStatus(path, pathType, filter, ordering, conf);
     iterators = Lists.newArrayList();
+    init(statuses, ordering, reuseKeyValueInstances, conf);
+  }
 
-    Iterator<Iterator<Pair<K,V>>> fsIterators =
-        Iterators.transform(fileStatusIterator,
-                            new Function<FileStatus, Iterator<Pair<K, V>>>() {
-                              @Override
-                              public Iterator<Pair<K, V>> apply(FileStatus 
from) {
-                                try {
-                                  SequenceFileIterator<K,V> iterator =
-                                      new 
SequenceFileIterator<K,V>(from.getPath(), reuseKeyValueInstances, conf);
-                                  iterators.add(iterator);
-                                  return iterator;
-                                } catch (IOException ioe) {
-                                  throw new 
IllegalStateException(from.getPath().toString(), ioe);
-                                }
+  private void init(FileStatus[] statuses,
+                    Comparator<FileStatus> ordering,
+                    final boolean reuseKeyValueInstances,
+                    final Configuration conf) throws IOException {
+
+    /*
+     * prevent NPEs. Unfortunately, Hadoop would return null for list if 
nothing
+     * was qualified. In this case, which is a corner case, we should assume an
+     * empty iterator, not an NPE.
+     */
+    if (statuses == null)
+      statuses = new FileStatus[0];
+
+    Iterator<FileStatus> fileStatusIterator =
+      Iterators.forArray(statuses == null ? new FileStatus[0] : statuses);
+
+    Iterator<Iterator<Pair<K, V>>> fsIterators =
+      Iterators.transform(fileStatusIterator,
+                          new Function<FileStatus, Iterator<Pair<K, V>>>() {
+                            @Override
+                            public Iterator<Pair<K, V>> apply(FileStatus from) 
{
+                              try {
+                                SequenceFileIterator<K, V> iterator =
+                                  new SequenceFileIterator<K, 
V>(from.getPath(),
+                                                                 
reuseKeyValueInstances,
+                                                                 conf);
+                                iterators.add(iterator);
+                                return iterator;
+                              } catch (IOException ioe) {
+                                throw new IllegalStateException(from.getPath()
+                                                                    
.toString(),
+                                                                ioe);
                               }
-                            });
+                            }
+                          });
 
     Collections.reverse(iterators); // close later in reverse order
 

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java?rev=1221603&r1=1221602&r2=1221603&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
 Wed Dec 21 04:08:04 2011
@@ -38,16 +38,31 @@ import org.apache.hadoop.io.Writable;
 import org.apache.mahout.common.IOUtils;
 
 /**
- * Like {@link SequenceFileValueIterator}, but iterates not just over one 
sequence file, but many. The input path
- * may be specified as a directory of files to read, or as a glob pattern. The 
set of files may be optionally
+ * Like {@link SequenceFileValueIterator}, but iterates not just over one
+ * sequence file, but many. The input path may be specified as a directory of
+ * files to read, or as a glob pattern. The set of files may be optionally
  * restricted with a {@link PathFilter}.
  */
-public final class SequenceFileDirValueIterator<V extends Writable>
-    extends ForwardingIterator<V> implements Closeable {
+public final class SequenceFileDirValueIterator<V extends Writable> extends
+    ForwardingIterator<V> implements Closeable {
 
-  private final Iterator<V> delegate;
+  private Iterator<V> delegate;
   private final List<SequenceFileValueIterator<V>> iterators;
 
+  /**
+   * Constructor that uses either {@link FileSystem#listStatus(Path)} or
+   * {@link FileSystem#globStatus(Path)} to obtain list of files to iterate 
over
+   * (depending on pathType parameter).
+   * <P>
+   * 
+   * @param path
+   * @param pathType
+   * @param filter
+   * @param ordering
+   * @param reuseKeyValueInstances
+   * @param conf
+   * @throws IOException
+   */
   public SequenceFileDirValueIterator(Path path,
                                       PathType pathType,
                                       PathFilter filter,
@@ -55,38 +70,103 @@ public final class SequenceFileDirValueI
                                       final boolean reuseKeyValueInstances,
                                       final Configuration conf) throws 
IOException {
     FileStatus[] statuses;
-    FileSystem fs = path.getFileSystem(conf);
+    FileSystem fs = FileSystem.get(conf);
     if (filter == null) {
-      statuses = pathType == PathType.GLOB ? fs.globStatus(path) : 
fs.listStatus(path);
+      statuses =
+        pathType == PathType.GLOB ? fs.globStatus(path) : fs.listStatus(path);
     } else {
-      statuses = pathType == PathType.GLOB ? fs.globStatus(path, filter) : 
fs.listStatus(path, filter);
+      statuses =
+        pathType == PathType.GLOB ? fs.globStatus(path, filter)
+            : fs.listStatus(path, filter);
     }
+    iterators = Lists.newArrayList();
+    init(statuses, ordering, reuseKeyValueInstances, conf);
+  }
+
+  /**
+   * Multifile sequence file iterator where files are specified explicitly by
+   * path parameters.
+   * 
+   * @param path
+   * @param ordering
+   * @param reuseKeyValueInstances
+   * @param conf
+   * @throws IOException
+   */
+  public SequenceFileDirValueIterator(Path[] path,
+                                      Comparator<FileStatus> ordering,
+                                      final boolean reuseKeyValueInstances,
+                                      final Configuration conf) throws 
IOException {
+
+    iterators = Lists.newArrayList();
+    /*
+     * we assume all files should exist, otherwise we will bail out.
+     */
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] statuses = new FileStatus[path.length];
+    for (int i = 0; i < statuses.length; i++)
+      statuses[i] = fs.getFileStatus(path[i]);
+    init(statuses, ordering, reuseKeyValueInstances, conf);
+  }
+
+  private void init(FileStatus[] statuses,
+                    Comparator<FileStatus> ordering,
+                    final boolean reuseKeyValueInstances,
+                    final Configuration conf) throws IOException {
+
+    /*
+     * prevent NPEs. Unfortunately, Hadoop would return null for list if 
nothing
+     * was qualified. In this case, which is a corner case, we should assume an
+     * empty iterator, not an NPE.
+     */
+    if (statuses == null)
+      statuses = new FileStatus[0];
+
     if (ordering != null) {
       Arrays.sort(statuses, ordering);
     }
     Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses);
 
-    iterators = Lists.newArrayList();
+    boolean ok = false;
 
-    Iterator<Iterator<V>> fsIterators =
+    try {
+
+      Iterator<Iterator<V>> fsIterators =
         Iterators.transform(fileStatusIterator,
                             new Function<FileStatus, Iterator<V>>() {
                               @Override
                               public Iterator<V> apply(FileStatus from) {
                                 try {
                                   SequenceFileValueIterator<V> iterator =
-                                    new 
SequenceFileValueIterator<V>(from.getPath(), reuseKeyValueInstances, conf);
+                                    new 
SequenceFileValueIterator<V>(from.getPath(),
+                                                                     
reuseKeyValueInstances,
+                                                                     conf);
                                   iterators.add(iterator);
                                   return iterator;
                                 } catch (IOException ioe) {
-                                  throw new 
IllegalStateException(from.getPath().toString(), ioe);
+                                  throw new 
IllegalStateException(from.getPath()
+                                                                      
.toString(),
+                                                                  ioe);
                                 }
                               }
                             });
 
-    Collections.reverse(iterators); // close later in reverse order
+      Collections.reverse(iterators); // close later in reverse order
+
+      delegate = Iterators.concat(fsIterators);
+      ok = true;
 
-    delegate = Iterators.concat(fsIterators);
+    } finally {
+      /*
+       * prevent file handle leaks in case one of handles fails to open. If 
some
+       * of the files fail to open, constructor will fail and close() will 
never
+       * be called. Thus, those handles that did open in constructor, would 
leak
+       * out, unless we specifically handle it here.
+       */
+
+      if (!ok)
+        IOUtils.close(iterators);
+    }
   }
 
   @Override
@@ -97,7 +177,6 @@ public final class SequenceFileDirValueI
   @Override
   public void close() throws IOException {
     IOUtils.close(iterators);
-    iterators.clear();
   }
 
 }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java?rev=1221603&r1=1221602&r2=1221603&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtDenseOutJob.java
 Wed Dec 21 04:08:04 2011
@@ -29,6 +29,8 @@ import java.util.regex.Matcher;
 
 import org.apache.commons.lang.Validate;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
@@ -63,6 +65,7 @@ import org.apache.mahout.math.hadoop.sto
 public class ABtDenseOutJob {
 
   public static final String PROP_BT_PATH = "ssvd.Bt.path";
+  public static final String PROP_BT_BROADCAST = "ssvd.Bt.broadcast";
 
   private ABtDenseOutJob() {
   }
@@ -94,6 +97,9 @@ public class ABtDenseOutJob {
     private int aRowCount;
     private int kp;
     private int blockHeight;
+    private boolean distributedBt;
+    private Path[] btLocalPath;
+    private Configuration localFsConfig;
 
     @Override
     protected void map(Writable key, VectorWritable value, Context context)
@@ -114,8 +120,7 @@ public class ABtDenseOutJob {
           aCols[i].setQuick(aRowCount, vec.getQuick(i));
         }
       } else if (vec.size() > 0) {
-        for (Iterator<Vector.Element> vecIter = vec.iterateNonZero(); vecIter
-          .hasNext();) {
+        for (Iterator<Vector.Element> vecIter = vec.iterateNonZero(); 
vecIter.hasNext();) {
           Vector.Element vecEl = vecIter.next();
           int i = vecEl.index();
           extendAColIfNeeded(i, aRowCount + 1);
@@ -133,8 +138,7 @@ public class ABtDenseOutJob {
       } else if (aCols[col].size() < rowCount) {
         Vector newVec =
           new SequentialAccessSparseVector(rowCount + blockHeight,
-                                           aCols[col]
-                                             .getNumNondefaultElements() << 1);
+                                           
aCols[col].getNumNondefaultElements() << 1);
         newVec.viewPart(0, aCols[col].size()).assign(aCols[col]);
         aCols[col] = newVec;
       }
@@ -176,15 +180,26 @@ public class ABtDenseOutJob {
          */
         for (int pass = 0; pass < numPasses; pass++) {
 
-          btInput =
-            new SequenceFileDirIterator<IntWritable, VectorWritable>(btPath,
-                                                                     
PathType.GLOB,
-                                                                     null,
-                                                                     null,
-                                                                     true,
-                                                                     context
-                                                                       
.getConfiguration());
+          if (distributedBt) {
+
+            btInput =
+              new SequenceFileDirIterator<IntWritable, 
VectorWritable>(btLocalPath,
+                                                                       null,
+                                                                       true,
+                                                                       
localFsConfig);
+
+          } else {
+
+            btInput =
+              new SequenceFileDirIterator<IntWritable, VectorWritable>(btPath,
+                                                                       
PathType.GLOB,
+                                                                       null,
+                                                                       null,
+                                                                       true,
+                                                                       
context.getConfiguration());
+          }
           closeables.addFirst(btInput);
+          Validate.isTrue(btInput.hasNext(), "Empty B' input!");
 
           int aRowBegin = pass * blockHeight;
           int bh = Math.min(blockHeight, aRowCount - aRowBegin);
@@ -217,8 +232,7 @@ public class ABtDenseOutJob {
               continue;
             }
             int j = -1;
-            for (Iterator<Vector.Element> aColIter = aCol.iterateNonZero(); 
aColIter
-              .hasNext();) {
+            for (Iterator<Vector.Element> aColIter = aCol.iterateNonZero(); 
aColIter.hasNext();) {
               Vector.Element aEl = aColIter.next();
               j = aEl.index();
 
@@ -282,6 +296,15 @@ public class ABtDenseOutJob {
       blockHeight =
         context.getConfiguration().getInt(BtJob.PROP_OUTER_PROD_BLOCK_HEIGHT,
                                           -1);
+      distributedBt = context.getConfiguration().get(PROP_BT_BROADCAST) != 
null;
+      if (distributedBt) {
+
+        btLocalPath =
+          DistributedCache.getLocalCacheFiles(context.getConfiguration());
+
+        localFsConfig = new Configuration();
+        localFsConfig.set("fs.default.name", "file:///");
+      }
 
     }
   }
@@ -303,8 +326,8 @@ public class ABtDenseOutJob {
      * management completely and bypass MultipleOutputs entirely.
      */
 
-    private static final NumberFormat NUMBER_FORMAT = NumberFormat
-      .getInstance();
+    private static final NumberFormat NUMBER_FORMAT =
+      NumberFormat.getInstance();
     static {
       NUMBER_FORMAT.setMinimumIntegerDigits(5);
       NUMBER_FORMAT.setGroupingUsed(false);
@@ -393,8 +416,8 @@ public class ABtDenseOutJob {
       String uniqueFileName = FileOutputFormat.getUniqueFile(context, name, 
"");
       uniqueFileName = uniqueFileName.replaceFirst("-r-", "-m-");
       uniqueFileName =
-        uniqueFileName.replaceFirst("\\d+$", Matcher
-          .quoteReplacement(NUMBER_FORMAT.format(spw.getTaskId())));
+        uniqueFileName.replaceFirst("\\d+$",
+                                    
Matcher.quoteReplacement(NUMBER_FORMAT.format(spw.getTaskId())));
       return new Path(FileOutputFormat.getWorkOutputPath(context),
                       uniqueFileName);
     }
@@ -454,8 +477,9 @@ public class ABtDenseOutJob {
                          int k,
                          int p,
                          int outerProdBlockHeight,
-                         int numReduceTasks) throws ClassNotFoundException,
-    InterruptedException, IOException {
+                         int numReduceTasks,
+                         boolean broadcastBInput)
+    throws ClassNotFoundException, InterruptedException, IOException {
 
     JobConf oldApiJob = new JobConf(conf);
 
@@ -492,6 +516,24 @@ public class ABtDenseOutJob {
 
     job.setNumReduceTasks(numReduceTasks);
 
+    // broadcast Bt files if required.
+    if (broadcastBInput) {
+      job.getConfiguration().set(PROP_BT_BROADCAST, "y");
+
+      FileSystem fs = FileSystem.get(conf);
+      FileStatus[] fstats = fs.globStatus(inputBtGlob);
+      if (fstats != null) {
+        for (FileStatus fstat : fstats) {
+          /*
+           * new api is not enabled yet in our dependencies at this time, still
+           * using deprecated one
+           */
+          DistributedCache.addCacheFile(fstat.getPath().toUri(),
+                                        job.getConfiguration());
+        }
+      }
+    }
+
     job.submit();
     job.waitForCompletion(false);
 

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java?rev=1221603&r1=1221602&r2=1221603&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/ABtJob.java
 Wed Dec 21 04:08:04 2011
@@ -29,6 +29,8 @@ import java.util.regex.Matcher;
 
 import org.apache.commons.lang.Validate;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
@@ -63,6 +65,7 @@ import org.apache.mahout.math.hadoop.sto
 public class ABtJob {
 
   public static final String PROP_BT_PATH = "ssvd.Bt.path";
+  public static final String PROP_BT_BROADCAST = "ssvd.Bt.broadcast";
 
   private ABtJob() {
   }
@@ -116,8 +119,7 @@ public class ABtJob {
           aCols[i].setQuick(aRowCount, vec.getQuick(i));
         }
       } else {
-        for (Iterator<Vector.Element> vecIter = vec.iterateNonZero(); vecIter
-          .hasNext();) {
+        for (Iterator<Vector.Element> vecIter = vec.iterateNonZero(); 
vecIter.hasNext();) {
           Vector.Element vecEl = vecIter.next();
           int i = vecEl.index();
           extendAColIfNeeded(i, aRowCount + 1);
@@ -130,13 +132,12 @@ public class ABtJob {
     private void extendAColIfNeeded(int col, int rowCount) {
       if (aCols[col] == null) {
         aCols[col] =
-            new SequentialAccessSparseVector(rowCount < 10000 ? 10000 : 
rowCount,
-                                             16);
+          new SequentialAccessSparseVector(rowCount < 10000 ? 10000 : rowCount,
+                                           1);
       } else if (aCols[col].size() < rowCount) {
         Vector newVec =
           new SequentialAccessSparseVector(rowCount << 1,
-                                           aCols[col]
-                                             .getNumNondefaultElements() << 1);
+                                           
aCols[col].getNumNondefaultElements() << 1);
         newVec.viewPart(0, aCols[col].size()).assign(aCols[col]);
         aCols[col] = newVec;
       }
@@ -159,8 +160,7 @@ public class ABtJob {
             continue;
           }
           int j = -1;
-          for (Iterator<Vector.Element> aColIter = aCol.iterateNonZero(); 
aColIter
-              .hasNext(); ) {
+          for (Iterator<Vector.Element> aColIter = aCol.iterateNonZero(); 
aColIter.hasNext();) {
             Vector.Element aEl = aColIter.next();
             j = aEl.index();
 
@@ -180,8 +180,7 @@ public class ABtJob {
         // this happens in sparse matrices when last rows are all zeros
         // and is subsequently causing shorter Q matrix row count which we
         // probably don't want to repair there but rather here.
-        Vector yDummy =
-          new SequentialAccessSparseVector(kp);
+        Vector yDummy = new SequentialAccessSparseVector(kp);
         // outValue.set(yDummy);
         for (lastRowIndex += 1; lastRowIndex < aRowCount; lastRowIndex++) {
           // outKey.setTaskItemOrdinal(lastRowIndex);
@@ -210,14 +209,42 @@ public class ABtJob {
       Validate.notNull(propBtPathStr, "Bt input is not set");
       Path btPath = new Path(propBtPathStr);
 
-      btInput =
-        new SequenceFileDirIterator<IntWritable, VectorWritable>(btPath,
-                                                                 PathType.GLOB,
-                                                                 null,
-                                                                 null,
-                                                                 true,
-                                                                 context
-                                                                   
.getConfiguration());
+      boolean distributedBt =
+        context.getConfiguration().get(PROP_BT_BROADCAST) != null;
+
+      if (distributedBt) {
+
+        Path[] btFiles =
+          DistributedCache.getLocalCacheFiles(context.getConfiguration());
+
+        // DEBUG: stdout
+        System.out.printf("list of files: " + btFiles);
+
+        String btLocalPath = "";
+        for (Path btFile : btFiles) {
+          if (btLocalPath.length() > 0)
+            btLocalPath += Path.SEPARATOR_CHAR;
+          btLocalPath += btFile;
+        }
+
+        btInput =
+          new SequenceFileDirIterator<IntWritable, VectorWritable>(new 
Path(btLocalPath),
+                                                                   
PathType.LIST,
+                                                                   null,
+                                                                   null,
+                                                                   true,
+                                                                   
context.getConfiguration());
+
+      } else {
+
+        btInput =
+          new SequenceFileDirIterator<IntWritable, VectorWritable>(btPath,
+                                                                   
PathType.GLOB,
+                                                                   null,
+                                                                   null,
+                                                                   true,
+                                                                   
context.getConfiguration());
+      }
       // TODO: how do i release all that stuff??
       closeables.addFirst(btInput);
       OutputCollector<LongWritable, SparseRowBlockWritable> yiBlockCollector =
@@ -261,8 +288,8 @@ public class ABtJob {
     // management
     // completely and bypass MultipleOutputs entirely.
 
-    private static final NumberFormat NUMBER_FORMAT = NumberFormat
-      .getInstance();
+    private static final NumberFormat NUMBER_FORMAT =
+      NumberFormat.getInstance();
     static {
       NUMBER_FORMAT.setMinimumIntegerDigits(5);
       NUMBER_FORMAT.setGroupingUsed(false);
@@ -342,8 +369,8 @@ public class ABtJob {
       String uniqueFileName = FileOutputFormat.getUniqueFile(context, name, 
"");
       uniqueFileName = uniqueFileName.replaceFirst("-r-", "-m-");
       uniqueFileName =
-        uniqueFileName.replaceFirst("\\d+$", Matcher
-          .quoteReplacement(NUMBER_FORMAT.format(spw.getTaskId())));
+        uniqueFileName.replaceFirst("\\d+$",
+                                    
Matcher.quoteReplacement(NUMBER_FORMAT.format(spw.getTaskId())));
       return new Path(FileOutputFormat.getWorkOutputPath(context),
                       uniqueFileName);
     }
@@ -403,8 +430,9 @@ public class ABtJob {
                          int k,
                          int p,
                          int outerProdBlockHeight,
-                         int numReduceTasks) throws ClassNotFoundException,
-    InterruptedException, IOException {
+                         int numReduceTasks,
+                         boolean broadcastBInput)
+    throws ClassNotFoundException, InterruptedException, IOException {
 
     JobConf oldApiJob = new JobConf(conf);
 
@@ -459,6 +487,23 @@ public class ABtJob {
 
     job.setNumReduceTasks(numReduceTasks);
 
+    // broadcast Bt files if required.
+    if (broadcastBInput) {
+      job.getConfiguration().set(PROP_BT_BROADCAST, "y");
+
+      FileSystem fs = FileSystem.get(conf);
+      FileStatus[] fstats = fs.globStatus(inputBtGlob);
+      if (fstats != null) {
+        for (FileStatus fstat : fstats) {
+          /*
+           * new api is not enabled yet in our dependencies at this time, still
+           * using deprecated one
+           */
+          DistributedCache.addCacheFile(fstat.getPath().toUri(), conf);
+        }
+      }
+    }
+
     job.submit();
     job.waitForCompletion(false);
 

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java?rev=1221603&r1=1221602&r2=1221603&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java
 Wed Dec 21 04:08:04 2011
@@ -25,6 +25,9 @@ import java.util.Iterator;
 
 import org.apache.commons.lang.Validate;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -82,6 +85,7 @@ public final class BtJob {
     "ssvd.BtJob.outputBBtProducts";
   public static final String PROP_OUTER_PROD_BLOCK_HEIGHT =
     "ssvd.outerProdBlockHeight";
+  public static final String PROP_RHAT_BROADCAST = "ssvd.rhat.broadcast";
 
   static final double SPARSE_ZEROS_PCT_THRESHOLD = 0.1;
 
@@ -137,8 +141,7 @@ public final class BtJob {
       }
 
       if (!aRow.isDense()) {
-        for (Iterator<Vector.Element> iter = aRow.iterateNonZero(); iter
-          .hasNext();) {
+        for (Iterator<Vector.Element> iter = aRow.iterateNonZero(); 
iter.hasNext();) {
           Vector.Element el = iter.next();
           double mul = el.get();
           for (int j = 0; j < kp; j++) {
@@ -179,25 +182,51 @@ public final class BtJob {
       SequenceFileValueIterator<DenseBlockWritable> qhatInput =
         new SequenceFileValueIterator<DenseBlockWritable>(qInputPath,
                                                           true,
-                                                          context
-                                                            
.getConfiguration());
+                                                          
context.getConfiguration());
       closeables.addFirst(qhatInput);
 
       /*
        * read all r files _in order of task ids_, i.e. partitions (aka group
-       * nums)
+       * nums).
+       * 
+       * Note: if broadcast option is used, this comes from distributed cache
+       * files rather than hdfs path.
        */
 
-      Path rPath = new Path(qJobPath, QJob.OUTPUT_RHAT + "-*");
+      SequenceFileDirValueIterator<VectorWritable> rhatInput;
+
+      boolean distributedRHat =
+        context.getConfiguration().get(PROP_RHAT_BROADCAST) != null;
+      if (distributedRHat) {
+
+        Path[] rFiles =
+          DistributedCache.getLocalCacheFiles(context.getConfiguration());
+
+        Validate.notNull(rFiles,
+                         "no RHat files in distributed cache job definition");
+
+        Configuration conf = new Configuration();
+        conf.set("fs.default.name", "file:///");
+
+        rhatInput =
+          new SequenceFileDirValueIterator<VectorWritable>(rFiles,
+                                                           
SSVDSolver.PARTITION_COMPARATOR,
+                                                           true,
+                                                           conf);
+
+      } else {
+        Path rPath = new Path(qJobPath, QJob.OUTPUT_RHAT + "-*");
+        rhatInput =
+          new SequenceFileDirValueIterator<VectorWritable>(rPath,
+                                                           PathType.GLOB,
+                                                           null,
+                                                           
SSVDSolver.PARTITION_COMPARATOR,
+                                                           true,
+                                                           
context.getConfiguration());
+      }
+
+      Validate.isTrue(rhatInput.hasNext(), "Empty R-hat input!");
 
-      SequenceFileDirValueIterator<VectorWritable> rhatInput =
-        new SequenceFileDirValueIterator<VectorWritable>(rPath,
-                                                         PathType.GLOB,
-                                                         null,
-                                                         
SSVDSolver.PARTITION_COMPARATOR,
-                                                         true,
-                                                         context
-                                                           
.getConfiguration());
       closeables.addFirst(rhatInput);
       outputs = new MultipleOutputs(new JobConf(context.getConfiguration()));
       closeables.addFirst(new 
IOUtils.MultipleOutputsCloseableAdapter(outputs));
@@ -230,7 +259,9 @@ public final class BtJob {
 
       btCollector =
         new SparseRowBlockAccumulator(context.getConfiguration()
-          .getInt(PROP_OUTER_PROD_BLOCK_HEIGHT, -1), btBlockCollector);
+                                             
.getInt(PROP_OUTER_PROD_BLOCK_HEIGHT,
+                                                     -1),
+                                      btBlockCollector);
       closeables.addFirst(btCollector);
 
     }
@@ -304,8 +335,7 @@ public final class BtJob {
         mBBt = new UpperTriangular(k + p);
 
         outputs = new MultipleOutputs(new JobConf(context.getConfiguration()));
-        closeables
-          .addFirst(new IOUtils.MultipleOutputsCloseableAdapter(outputs));
+        closeables.addFirst(new 
IOUtils.MultipleOutputsCloseableAdapter(outputs));
 
       }
     }
@@ -363,9 +393,8 @@ public final class BtJob {
           OutputCollector<Writable, Writable> collector =
             outputs.getCollector(OUTPUT_BBT, null);
 
-          collector
-            .collect(new IntWritable(),
-                     new VectorWritable(new DenseVector(mBBt.getData())));
+          collector.collect(new IntWritable(),
+                            new VectorWritable(new 
DenseVector(mBBt.getData())));
         }
       } finally {
         IOUtils.close(closeables);
@@ -384,26 +413,25 @@ public final class BtJob {
                          int p,
                          int btBlockHeight,
                          int numReduceTasks,
+                         boolean broadcast,
                          Class<? extends Writable> labelClass,
                          boolean outputBBtProducts)
     throws ClassNotFoundException, InterruptedException, IOException {
 
     JobConf oldApiJob = new JobConf(conf);
 
-    MultipleOutputs
-      .addNamedOutput(oldApiJob,
-                      OUTPUT_Q,
-                      org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
-                      labelClass,
-                      VectorWritable.class);
+    MultipleOutputs.addNamedOutput(oldApiJob,
+                                   OUTPUT_Q,
+                                   
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+                                   labelClass,
+                                   VectorWritable.class);
 
     if (outputBBtProducts) {
-      MultipleOutputs
-        .addNamedOutput(oldApiJob,
-                        OUTPUT_BBT,
-                        
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
-                        IntWritable.class,
-                        VectorWritable.class);
+      MultipleOutputs.addNamedOutput(oldApiJob,
+                                     OUTPUT_BBT,
+                                     
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+                                     IntWritable.class,
+                                     VectorWritable.class);
     }
 
     /*
@@ -450,6 +478,30 @@ public final class BtJob {
 
     job.setNumReduceTasks(numReduceTasks);
 
+    /*
+     * we can broadhast Rhat files since all of them are reuqired by each job,
+     * but not Q files which correspond to splits of A (so each split of A will
+     * require only particular Q file, each time different one).
+     */
+
+    if (broadcast) {
+      job.getConfiguration().set(PROP_RHAT_BROADCAST, "y");
+
+      FileSystem fs = FileSystem.get(conf);
+      FileStatus[] fstats =
+        fs.globStatus(new Path(inputPathQJob, QJob.OUTPUT_RHAT + "-*"));
+      if (fstats != null) {
+        for (FileStatus fstat : fstats) {
+          /*
+           * new api is not enabled yet in our dependencies at this time, still
+           * using deprecated one
+           */
+          DistributedCache.addCacheFile(fstat.getPath().toUri(),
+                                        job.getConfiguration());
+        }
+      }
+    }
+
     job.submit();
     job.waitForCompletion(false);
 

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java?rev=1221603&r1=1221602&r2=1221603&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDCli.java
 Wed Dec 21 04:08:04 2011
@@ -45,12 +45,15 @@ public class SSVDCli extends AbstractJob
     addOutputOption();
     addOption("rank", "k", "decomposition rank", true);
     addOption("oversampling", "p", "oversampling", String.valueOf(15));
-    addOption("blockHeight", "r", "Y block height (must be > (k+p))", 
String.valueOf(10000));
+    addOption("blockHeight",
+              "r",
+              "Y block height (must be > (k+p))",
+              String.valueOf(10000));
     addOption("outerProdBlockHeight",
               "oh",
               "block height of outer products during multiplication, increase 
for sparse inputs",
               String.valueOf(30000));
-    addOption("abtBlockHeight", 
+    addOption("abtBlockHeight",
               "abth",
               "block height of Y_i in ABtJob during AB' multiplication, 
increase for extremely sparse inputs",
               String.valueOf(200000));
@@ -68,11 +71,15 @@ public class SSVDCli extends AbstractJob
     addOption("reduceTasks",
               "t",
               "number of reduce tasks (where applicable)",
-              String.valueOf(1));
+              true);
     addOption("powerIter",
               "q",
               "number of additional power iterations (0..2 is good)",
               String.valueOf(0));
+    addOption("broadcast",
+              "br",
+              "whether use distributed cache to broadcast matrices wherever 
possible",
+              String.valueOf(true));
     addOption(DefaultOptionCreator.overwriteOption().create());
 
     Map<String, String> pargs = parseArguments(args);
@@ -92,6 +99,7 @@ public class SSVDCli extends AbstractJob
     boolean cUHalfSigma = Boolean.parseBoolean(pargs.get("--uHalfSigma"));
     boolean cVHalfSigma = Boolean.parseBoolean(pargs.get("--vHalfSigma"));
     int reduceTasks = Integer.parseInt(pargs.get("--reduceTasks"));
+    boolean broadcast = Boolean.parseBoolean(pargs.get("--broadcast"));
     boolean overwrite =
       pargs.containsKey(keyFor(DefaultOptionCreator.OVERWRITE_OPTION));
 
@@ -116,6 +124,7 @@ public class SSVDCli extends AbstractJob
     solver.setOuterBlockHeight(h);
     solver.setAbtBlockHeight(abh);
     solver.setQ(q);
+    solver.setBroadcast(broadcast);
     solver.setOverwrite(overwrite);
 
     solver.run();
@@ -128,14 +137,16 @@ public class SSVDCli extends AbstractJob
     SequenceFile.Writer sigmaW = null;
 
     try {
-      sigmaW = SequenceFile.createWriter(fs,
-                                conf,
-                                getOutputPath("sigma"),
-                                NullWritable.class,
-                                VectorWritable.class);
+      sigmaW =
+        SequenceFile.createWriter(fs,
+                                  conf,
+                                  getOutputPath("sigma"),
+                                  NullWritable.class,
+                                  VectorWritable.class);
       Writable sValues =
-        new VectorWritable(new DenseVector(Arrays.copyOf(solver
-          .getSingularValues(), k), true));
+        new VectorWritable(new 
DenseVector(Arrays.copyOf(solver.getSingularValues(),
+                                                         k),
+                                           true));
       sigmaW.append(NullWritable.get(), sValues);
 
     } finally {

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java?rev=1221603&r1=1221602&r2=1221603&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/SSVDSolver.java
 Wed Dec 21 04:08:04 2011
@@ -117,6 +117,7 @@ public class SSVDSolver {
   private boolean cUHalfSigma;
   private boolean cVHalfSigma;
   private boolean overwrite;
+  private boolean broadcast = true;
 
   /**
    * create new SSVD solver. Required parameters are passed to constructor to
@@ -284,6 +285,20 @@ public class SSVDSolver {
     this.abtBlockHeight = abtBlockHeight;
   }
 
+  public boolean isBroadcast() {
+    return broadcast;
+  }
+
+  /**
+   * If this property is true, use DestributedCache mechanism to broadcast some
+   * stuff around. May improve efficiency. Default is false.
+   * 
+   * @param broadcast
+   */
+  public void setBroadcast(boolean broadcast) {
+    this.broadcast = broadcast;
+  }
+
   /**
    * run all SSVD jobs.
    * 
@@ -292,7 +307,7 @@ public class SSVDSolver {
    */
   public void run() throws IOException {
 
-    Deque<Closeable> closeables = Lists.<Closeable>newLinkedList();
+    Deque<Closeable> closeables = Lists.<Closeable> newLinkedList();
     try {
       Class<? extends Writable> labelType =
         sniffInputLabelType(inputPath, conf);
@@ -322,8 +337,12 @@ public class SSVDSolver {
                seed,
                reduceTasks);
 
-      // restrict number of reducers to a reasonable number
-      // so we don't have to run too many additions in the frontend.
+      /*
+       * restrict number of reducers to a reasonable number so we don't have to
+       * run too many additions in the frontend when reconstructing BBt for the
+       * last B' and BB' computations. The user may not realize that and gives 
a
+       * bit too many (I would be happy i that were ever the case though).
+       */
 
       BtJob.run(conf,
                 inputPath,
@@ -333,7 +352,8 @@ public class SSVDSolver {
                 k,
                 p,
                 outerBlockHeight,
-                Math.min(1000, reduceTasks),
+                q <= 0 ? Math.min(1000, reduceTasks) : reduceTasks,
+                broadcast,
                 labelType,
                 q <= 0);
 
@@ -351,7 +371,8 @@ public class SSVDSolver {
                            k,
                            p,
                            abtBlockHeight,
-                           reduceTasks);
+                           reduceTasks,
+                           broadcast);
 
         btPath = new Path(outputPath, String.format("Bt-job-%d", i + 1));
 
@@ -363,7 +384,8 @@ public class SSVDSolver {
                   k,
                   p,
                   outerBlockHeight,
-                  Math.min(1000, reduceTasks),
+                  i == q - 1 ? Math.min(1000, reduceTasks) : reduceTasks,
+                  broadcast,
                   labelType,
                   i == q - 1);
       }
@@ -499,7 +521,8 @@ public class SSVDSolver {
       if (!fstats[0].isDir()) {
         firstSeqFile = fstats[0];
       } else {
-        firstSeqFile = fs.listStatus(fstats[0].getPath(), 
PathFilters.logsCRCFilter())[0];
+        firstSeqFile =
+          fs.listStatus(fstats[0].getPath(), PathFilters.logsCRCFilter())[0];
       }
 
       SequenceFile.Reader r = null;
@@ -566,7 +589,6 @@ public class SSVDSolver {
 
     List<double[]> denseData = Lists.newArrayList();
 
-
     /*
      * assume it is partitioned output, so we need to read them up in order of
      * partitions.

Modified: 
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java?rev=1221603&r1=1221602&r2=1221603&view=diff
==============================================================================
--- 
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java
 (original)
+++ 
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverDenseTest.java
 Wed Dec 21 04:08:04 2011
@@ -132,6 +132,7 @@ public class LocalSSVDSolverDenseTest ex
     ssvd.setAbtBlockHeight(400);
     ssvd.setOverwrite(true);
     ssvd.setQ(q);
+    ssvd.setBroadcast(false);
     ssvd.run();
 
     double[] stochasticSValues = ssvd.getSingularValues();

Modified: 
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java?rev=1221603&r1=1221602&r2=1221603&view=diff
==============================================================================
--- 
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java
 (original)
+++ 
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/stochasticsvd/LocalSSVDSolverSparseSequentialTest.java
 Wed Dec 21 04:08:04 2011
@@ -152,6 +152,7 @@ public class LocalSSVDSolverSparseSequen
     
     ssvd.setOverwrite(true);
     ssvd.setQ(q);
+    ssvd.setBroadcast(true);
     ssvd.run();
 
     double[] stochasticSValues = ssvd.getSingularValues();


Reply via email to