Repository: mahout Updated Branches: refs/heads/master e24c4afb6 -> 2d1b0bf63
MAHOUT-1500: Code cleanup and javadocs closes apache/mahout#50 Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/2d1b0bf6 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/2d1b0bf6 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/2d1b0bf6 Branch: refs/heads/master Commit: 2d1b0bf632724ceb091035582274201269cfe3e3 Parents: e24c4af Author: Andrew Palumbo <[email protected]> Authored: Fri Sep 5 16:44:21 2014 -0400 Committer: Andrew Palumbo <[email protected]> Committed: Fri Sep 5 16:44:21 2014 -0400 ---------------------------------------------------------------------- .../mahout/h2obindings/H2OBlockMatrix.java | 40 +++- .../apache/mahout/h2obindings/H2OContext.java | 23 +- .../org/apache/mahout/h2obindings/H2OHdfs.java | 60 +++-- .../apache/mahout/h2obindings/H2OHelper.java | 240 +++++++++++++------ .../apache/mahout/h2obindings/drm/H2OBCast.java | 47 +++- .../apache/mahout/h2obindings/drm/H2ODrm.java | 11 + .../org/apache/mahout/h2obindings/ops/ABt.java | 28 ++- .../org/apache/mahout/h2obindings/ops/AewB.java | 30 ++- .../mahout/h2obindings/ops/AewScalar.java | 23 +- .../org/apache/mahout/h2obindings/ops/At.java | 26 +- .../org/apache/mahout/h2obindings/ops/AtA.java | 29 ++- .../org/apache/mahout/h2obindings/ops/AtB.java | 28 ++- .../org/apache/mahout/h2obindings/ops/Atx.java | 33 ++- .../org/apache/mahout/h2obindings/ops/Ax.java | 26 +- .../apache/mahout/h2obindings/ops/Cbind.java | 45 ++-- .../apache/mahout/h2obindings/ops/MapBlock.java | 79 +++--- .../org/apache/mahout/h2obindings/ops/Par.java | 41 ++-- .../apache/mahout/h2obindings/ops/Rbind.java | 22 +- .../apache/mahout/h2obindings/ops/RowRange.java | 45 ++-- .../h2obindings/ops/TimesRightMatrix.java | 39 +-- .../apache/mahout/h2obindings/H2OEngine.scala | 24 +- .../h2obindings/drm/CheckpointedDrmH2O.scala | 24 +- 22 files changed, 650 insertions(+), 313 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/H2OBlockMatrix.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/H2OBlockMatrix.java b/h2o/src/main/java/org/apache/mahout/h2obindings/H2OBlockMatrix.java index 27c1d58..35ddb39 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/H2OBlockMatrix.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/H2OBlockMatrix.java @@ -25,7 +25,7 @@ import org.apache.mahout.math.SparseMatrix; import water.fvec.Chunk; -/* +/** * A Matrix implementation to represent a vertical Block of DRM. * * Creation of the matrix is an O(1) operation with negligible @@ -39,67 +39,81 @@ import water.fvec.Chunk; * input matrix save on the copy overhead. */ public class H2OBlockMatrix extends AbstractMatrix { - Chunk _chks[]; - Matrix cow; /* Copy on Write */ + /** Backing chunks which store the original matrix data */ + private Chunk chks[]; + /** Copy on write matrix created on demand when original matrix is modified */ + private Matrix cow; + /** Class constructor. */ public H2OBlockMatrix(Chunk chks[]) { super(chks[0].len(), chks.length); - _chks = chks; + this.chks = chks; } + /** + * Internal method to create the copy on write matrix. + * + * Once created, all further operations are performed on the CoW matrix + */ private void cow() { if (cow != null) { return; } - if (_chks[0].isSparse()) { - cow = new SparseMatrix(_chks[0].len(), _chks.length); + if (chks[0].isSparse()) { + cow = new SparseMatrix(chks[0].len(), chks.length); } else { - cow = new DenseMatrix(_chks[0].len(), _chks.length); + cow = new DenseMatrix(chks[0].len(), chks.length); } - for (int c = 0; c < _chks.length; c++) { - for (int r = 0; r < _chks[0].len(); r++) { - cow.setQuick(r, c, _chks[c].at0(r)); + for (int c = 0; c < chks.length; c++) { + for (int r = 0; r < chks[0].len(); r++) { + cow.setQuick(r, c, chks[c].at0(r)); } } } + @Override public void setQuick(int row, int col, double val) { cow(); cow.setQuick(row, col, val); } + @Override public Matrix like(int nrow, int ncol) { - if (_chks[0].isSparse()) { + if (chks[0].isSparse()) { return new SparseMatrix(nrow, ncol); } else { return new DenseMatrix(nrow, ncol); } } + @Override public Matrix like() { - if (_chks[0].isSparse()) { + if (chks[0].isSparse()) { return new SparseMatrix(rowSize(), columnSize()); } else { return new DenseMatrix(rowSize(), columnSize()); } } + @Override public double getQuick(int row, int col) { if (cow != null) { return cow.getQuick(row, col); } else { - return _chks[col].at0(row); + return chks[col].at0(row); } } + @Override public Matrix assignRow(int row, Vector v) { cow(); cow.assignRow(row, v); return cow; } + @Override public Matrix assignColumn(int col, Vector v) { cow(); cow.assignColumn(col, v); http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/H2OContext.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/H2OContext.java b/h2o/src/main/java/org/apache/mahout/h2obindings/H2OContext.java index 1307ef8..96a2f8f 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/H2OContext.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/H2OContext.java @@ -19,17 +19,20 @@ package org.apache.mahout.h2obindings; import water.H2O; +/** + * Context to an H2O Cloud. + */ public class H2OContext { - String masterURL; - - /* @masterURL should actually be the cloud name (name of cluster) to which - all the H2O worker nodes "join into". This is not a hostname or IP address - of a server, but a string which all cluster members agree on. - */ - public H2OContext(String _masterURL) { - masterURL = _masterURL; - - H2O.main(new String[]{"-md5skip", "-name", _masterURL}); + /** + * Class constructor. + * + * @param masterURL The cloud name (name of cluster) to which all the H2O + * worker nodes "join into". This is not a hostname or IP + * address of a server, but a string which all cluster + * members agree on. + */ + public H2OContext(String masterURL) { + H2O.main(new String[]{"-md5skip", "-name", masterURL}); H2O.joinOthers(); } } http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHdfs.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHdfs.java b/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHdfs.java index 363c2df..f21ebe0 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHdfs.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHdfs.java @@ -45,8 +45,18 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.util.ReflectionUtils; - +/** + * SequenceFile I/O class (on HDFS) + */ public class H2OHdfs { + /** + * Predicate to check if a given filename is a SequenceFile. + * + * Inspect the first three bytes to determine the format of the file. + * + * @param filename Name of the file to check. + * @return True if file is of SequenceFile format. + */ public static boolean isSeqfile(String filename) { try { String uri = filename; @@ -69,6 +79,15 @@ public class H2OHdfs { } } + /** + * Create DRM from SequenceFile. + * + * Create a Mahout DRM backed on H2O from the specified SequenceFile. + * + * @param filename Name of the sequence file. + * @param parMin Minimum number of data partitions in the DRM. + * @return DRM object created. + */ public static H2ODrm drmFromFile(String filename, int parMin) { try { if (isSeqfile(filename)) { @@ -81,6 +100,9 @@ public class H2OHdfs { } } + /** + * Internal method called from <code>drmFromFile</code> if format verified. + */ public static H2ODrm drmFromSeqfile(String filename, int parMin) { long rows = 0; int cols = 0; @@ -95,7 +117,7 @@ public class H2OHdfs { FileSystem fs = FileSystem.get(URI.create(uri), conf); Vec.Writer writers[]; Vec.Writer labelwriter = null; - boolean is_int_key = false, is_long_key = false, is_string_key = false; + boolean isIntKey = false, isLongKey = false, isStringKey = false; reader = new SequenceFile.Reader(fs, path, conf); @@ -114,11 +136,11 @@ public class H2OHdfs { long start = reader.getPosition(); if (reader.getKeyClass() == Text.class) { - is_string_key = true; + isStringKey = true; } else if (reader.getKeyClass() == LongWritable.class) { - is_long_key = true; + isLongKey = true; } else { - is_int_key = true; + isIntKey = true; } while (reader.next(key, value)) { @@ -126,13 +148,13 @@ public class H2OHdfs { Vector v = value.get(); cols = Math.max(v.size(), cols); } - if (is_long_key) { + if (isLongKey) { rows = Math.max(((LongWritable)(key)).get()+1, rows); } - if (is_int_key) { + if (isIntKey) { rows = Math.max(((IntWritable)(key)).get()+1, rows); } - if (is_string_key) { + if (isStringKey) { rows++; } } @@ -152,10 +174,10 @@ public class H2OHdfs { long r = 0; while (reader.next(key, value)) { Vector v = value.get(); - if (is_long_key) { + if (isLongKey) { r = ((LongWritable)(key)).get(); } - if (is_int_key) { + if (isIntKey) { r = ((IntWritable)(key)).get(); } for (int c = 0; c < v.size(); c++) { @@ -164,7 +186,7 @@ public class H2OHdfs { if (labels != null) { labelwriter.set(r, ((Text)key).toString()); } - if (is_string_key) { + if (isStringKey) { r++; } } @@ -185,15 +207,21 @@ public class H2OHdfs { return new H2ODrm(frame, labels); } - public static void drmToFile(String filename, H2ODrm Drm) throws java.io.IOException { - Frame frame = Drm.frame; - Vec labels = Drm.keys; + /** + * Create SequenceFile on HDFS from DRM object. + * + * @param filename Filename to create and store DRM data in. + * @param drm DRM object storing Matrix data in memory. + */ + public static void drmToFile(String filename, H2ODrm drm) throws java.io.IOException { + Frame frame = drm.frame; + Vec labels = drm.keys; String uri = filename; Configuration conf = new Configuration(); Path path = new Path(uri); FileSystem fs = FileSystem.get(URI.create(uri), conf); SequenceFile.Writer writer = null; - boolean is_sparse = H2OHelper.isSparse(frame); + boolean isSparse = H2OHelper.isSparse(frame); ValueString vstr = new ValueString(); if (labels != null) { @@ -204,7 +232,7 @@ public class H2OHdfs { for (long r = 0; r < frame.anyVec().length(); r++) { Vector v = null; - if (is_sparse) { + if (isSparse) { v = new SequentialAccessSparseVector(frame.numCols()); } else { v = new DenseVector(frame.numCols()); http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHelper.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHelper.java b/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHelper.java index 71d93dc..294ec7e 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHelper.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHelper.java @@ -36,24 +36,35 @@ import java.util.HashMap; import org.apache.mahout.h2obindings.drm.H2ODrm; +/** + * Collection of helper methods for H2O backend. + */ public class H2OHelper { - - /* - Is the matrix sparse? If the number of missing elements is - 32 x times the number of present elements, treat it as sparse - */ + /** + * Predicate to check if data is sparse in Frame. + * + * If the number of missing elements is 32x times the number of present + * elements, consider it as sparse. + * + * @param frame Frame storing matrix data. + * @return True if data is sparse in Frame. + */ public static boolean isSparse(Frame frame) { long rows = frame.numRows(); long cols = frame.numCols(); - /* MRTask to aggregate precalculated per-chunk sparse lengths */ + /** + * MRTask to aggregate precalculated per-chunk sparse lengths + */ class MRTaskNZ extends MRTask<MRTaskNZ> { long sparselen; + @Override public void map(Chunk chks[]) { for (Chunk chk : chks) { sparselen += chk.sparseLen(); } } + @Override public void reduce(MRTaskNZ other) { sparselen += other.sparselen; } @@ -64,11 +75,15 @@ public class H2OHelper { return (((rows * cols) / (sparselen + 1)) > 32); } - /* - Extract a Matrix from a Frame. Create either Sparse or - Dense Matrix depending on number of missing elements - in Frame. - */ + /** + * Create a Mahout Matrix from a DRM. + * + * Create either Sparse or Dense Matrix depending on number of missing + * elements in DRM. + * + * @param drm DRM object to create Matrix from. + * @return created Matrix. + */ public static Matrix matrixFromDrm(H2ODrm drm) { Frame frame = drm.frame; Vec labels = drm.keys; @@ -81,7 +96,7 @@ public class H2OHelper { } int c = 0; - /* Fill matrix, column at a time */ + // Fill matrix, column at a time. for (Vec v : frame.vecs()) { for (int r = 0; r < frame.numRows(); r++) { double d = 0.0; @@ -92,7 +107,7 @@ public class H2OHelper { c++; } - /* If string keyed, set the stings as rowlabels */ + // If string keyed, set the stings as rowlabels. if (labels != null) { HashMap<String,Integer> map = new HashMap<String,Integer>(); ValueString vstr = new ValueString(); @@ -104,12 +119,14 @@ public class H2OHelper { return m; } - /* Calculate Means of elements in a column, and return - as a vector. - - H2O precalculates means in a Vec, and a Vec corresponds - to a column. - */ + /** + * Calculate Means of elements in a column, and return as a Vector. + * + * H2O precalculates means in a Vec, and a Vec corresponds to a column. + * + * @param frame Frame backing the H2O DRM. + * @return Vector of pre-calculated means. + */ public static Vector colMeans(Frame frame) { double means[] = new double[frame.numCols()]; for (int i = 0; i < frame.numCols(); i++) { @@ -118,16 +135,22 @@ public class H2OHelper { return new DenseVector(means); } - /* Calculate Sum of all elements in a column, and - return as a Vector - - Run an MRTask Job to add up sums in @_sums - - WARNING: Vulnerable to overflow. No way around it. - */ + /** + * Calculate Sums of elements in a column, and return as a Vector. + * + * Run an MRTask Job to add up sums. + * WARNING: Vulnerable to overflow. No way around it. + * + * @param frame Frame backing the H2O DRM. + * @return Vector of calculated sums. + */ public static Vector colSums(Frame frame) { + /** + * MRTask to calculate sums of elements in all columns. + */ class MRTaskSum extends MRTask<MRTaskSum> { public double sums[]; + @Override public void map(Chunk chks[]) { sums = new double[chks.length]; @@ -137,6 +160,7 @@ public class H2OHelper { } } } + @Override public void reduce(MRTaskSum other) { ArrayUtils.add(sums, other.sums); } @@ -144,14 +168,22 @@ public class H2OHelper { return new DenseVector(new MRTaskSum().doAll(frame).sums); } - - /* Calculate Sum of squares of all elements in the Matrix - - WARNING: Vulnerable to overflow. No way around it. - */ + /** + * Calculate Sum of squares of all elements in the DRM. + * + * Run an MRTask Job to add up sums of squares. + * WARNING: Vulnerable to overflow. No way around it. + * + * @param frame Frame backing the H2O DRM. + * @return Sum of squares of all elements in the DRM. + */ public static double sumSqr(Frame frame) { + /** + * MRTask to calculate sums of squares of all elements. + */ class MRTaskSumSqr extends MRTask<MRTaskSumSqr> { public double sumSqr; + @Override public void map(Chunk chks[]) { for (int c = 0; c < chks.length; c++) { for (int r = 0; r < chks[c].len(); r++) { @@ -159,6 +191,7 @@ public class H2OHelper { } } } + @Override public void reduce(MRTaskSumSqr other) { sumSqr += other.sumSqr; } @@ -166,16 +199,21 @@ public class H2OHelper { return new MRTaskSumSqr().doAll(frame).sumSqr; } - /* Calculate Sum of all elements in a column, and - return as a Vector - - Run an MRTask Job to add up sums in @_sums - - WARNING: Vulnerable to overflow. No way around it. - */ + /** + * Count non-zero elements in all columns, and return as a Vector. + * + * Run an MRTask Job to count non-zero elements per column. + * + * @param frame Frame backing the H2O DRM. + * @return Vector of counted non-zero elements. + */ public static Vector nonZeroCnt(Frame frame) { + /** + * MRTask to count all non-zero elements. + */ class MRTaskNonZero extends MRTask<MRTaskNonZero> { public double sums[]; + @Override public void map(Chunk chks[]) { sums = new double[chks.length]; @@ -187,6 +225,7 @@ public class H2OHelper { } } } + @Override public void reduce(MRTaskNonZero other) { ArrayUtils.add(sums, other.sums); } @@ -194,7 +233,7 @@ public class H2OHelper { return new DenseVector(new MRTaskNonZero().doAll(frame).sums); } - /* Convert String->Integer map to Integer->String map */ + /** Convert String->Integer map to Integer->String map */ private static Map<Integer,String> reverseMap(Map<String, Integer> map) { if (map == null) { return null; @@ -209,46 +248,64 @@ public class H2OHelper { return rmap; } - private static int chunkSize(long nrow, int ncol, int min, int exact) { - int chunk_sz; - int parts_hint = Math.max(min, exact); - - if (parts_hint < 1) { + /** + * Calculate optimum chunk size for given parameters. + * + * Chunk size is the number of elements stored per partition per column. + * + * @param nrow Number of rows in the DRM. + * @param ncol Number of columns in the DRM. + * @param minHint Minimum number of partitions to create, if passed value is not -1. + * @param exactHint Exact number of partitions to create, if passed value is not -1. + * @return Calculated optimum chunk size. + */ + private static int chunkSize(long nrow, int ncol, int minHint, int exactHint) { + int chunkSz; + int partsHint = Math.max(minHint, exactHint); + + if (partsHint < 1) { /* XXX: calculate based on cloud size and # of cpu */ - parts_hint = 4; + partsHint = 4; } - chunk_sz = (int)(((nrow - 1) / parts_hint) + 1); - if (exact > 0) { - return chunk_sz; + chunkSz = (int)(((nrow - 1) / partsHint) + 1); + if (exactHint > 0) { + return chunkSz; } - if (chunk_sz > 1e6) { - chunk_sz = (int)1e6; + if (chunkSz > 1e6) { + chunkSz = (int)1e6; } - if (min > 0) { - return chunk_sz; + if (minHint > 0) { + return chunkSz; } - if (chunk_sz < 1e3) { - chunk_sz = (int)1e3; + if (chunkSz < 1e3) { + chunkSz = (int)1e3; } - return chunk_sz; + return chunkSz; } - /* Ingest a Matrix into an H2O Frame. H2O Frame is the "backing" - data structure behind CheckpointedDrm. Steps: - */ - public static H2ODrm drmFromMatrix(Matrix m, int min_hint, int exact_hint) { - /* First create an empty (0-filled) frame of the required dimensions */ - Frame frame = emptyFrame(m.rowSize(), m.columnSize(), min_hint, exact_hint); + /** + * Ingest a Mahout Matrix into an H2O DRM. + * + * Frame is the backing data structure behind CheckpointedDrm. + * + * @param m Mahout Matrix to ingest data from. + * @param minHint Hint for minimum number of partitions in created DRM. + * @param exactHint Hint for exact number of partitions in created DRM. + * @return Created H2O backed DRM. + */ + public static H2ODrm drmFromMatrix(Matrix m, int minHint, int exactHint) { + // First create an empty (0-filled) frame of the required dimensions + Frame frame = emptyFrame(m.rowSize(), m.columnSize(), minHint, exactHint); Vec labels = null; Vec.Writer writers[] = new Vec.Writer[m.columnSize()]; Futures closer = new Futures(); - /* "open" vectors for writing efficiently in bulk */ + // "open" vectors for writing efficiently in bulk for (int i = 0; i < writers.length; i++) { writers[i] = frame.vecs()[i].open(); } @@ -263,10 +320,10 @@ public class H2OHelper { writers[c].close(closer); } - /* If string labeled matrix, create aux Vec */ + // If string labeled matrix, create aux Vec Map<String,Integer> map = m.getRowLabelBindings(); if (map != null) { - /* label vector must be similarly partitioned like the Frame */ + // label vector must be similarly partitioned like the Frame labels = frame.anyVec().makeZero(); Vec.Writer writer = labels.open(); Map<Integer,String> rmap = reverseMap(map); @@ -283,20 +340,47 @@ public class H2OHelper { return new H2ODrm(frame, labels); } - public static Frame emptyFrame(long nrow, int ncol, int min_hint, int exact_hint) { + /** + * Create an empty (zero-filled) H2O Frame efficiently. + * + * Create a zero filled Frame with specified cardinality. + * Do not actually fill zeroes in each cell, create pre-compressed chunks. + * Time taken per column asymptotically at O(nChunks), not O(nrow). + * + * @param nrow Number of rows in the Frame. + * @param ncol Number of columns in the Frame. + * @param minHint Hint for minimum number of chunks per column in created Frame. + * @param exactHint Hint for exact number of chunks per column in created Frame. + * @return Created Frame. + */ + public static Frame emptyFrame(long nrow, int ncol, int minHint, int exactHint) { Vec.VectorGroup vg = new Vec.VectorGroup(); - return emptyFrame(nrow, ncol, min_hint, exact_hint, vg); + return emptyFrame(nrow, ncol, minHint, exactHint, vg); } - public static Frame emptyFrame(long nrow, int ncol, int min_hint, int exact_hint, Vec.VectorGroup vg) { - int chunk_sz = chunkSize(nrow, ncol, min_hint, exact_hint); - int nchunks = (int)((nrow - 1) / chunk_sz) + 1; /* Final number of Chunks per Vec */ + /** + * Create an empty (zero-filled) H2O Frame efficiently. + * + * Create a zero filled Frame with specified cardinality. + * Do not actually fill zeroes in each cell, create pre-compressed chunks. + * Time taken per column asymptotically at O(nChunks), not O(nrow). + * + * @param nrow Number of rows in the Frame. + * @param ncol Number of columns in the Frame. + * @param minHint Hint for minimum number of chunks per column in created Frame. + * @param exactHint Hint for exact number of chunks per column in created Frame. + * @param vg Shared VectorGroup so that all columns are similarly partitioned. + * @return Created Frame. + */ + public static Frame emptyFrame(long nrow, int ncol, int minHint, int exactHint, Vec.VectorGroup vg) { + int chunkSz = chunkSize(nrow, ncol, minHint, exactHint); + int nchunks = (int)((nrow - 1) / chunkSz) + 1; // Final number of Chunks per Vec long espc[] = new long[nchunks + 1]; final Vec[] vecs = new Vec[ncol]; for (int i = 0; i < nchunks; i++) { - espc[i] = i * chunk_sz; + espc[i] = i * chunkSz; } espc[nchunks] = nrow; @@ -307,7 +391,19 @@ public class H2OHelper { return new Frame(vecs); } - public static H2ODrm emptyDrm(long nrow, int ncol, int min_hint, int exact_hint) { - return new H2ODrm(emptyFrame(nrow, ncol, min_hint, exact_hint)); + /** + * Create an empty (zero-filled) H2O DRM. + * + * Create a zero filled DRM with specified cardinality. + * Use the efficient emptyFrame() method internally. + * + * @param nrow Number of rows in the Frame. + * @param ncol Number of columns in the Frame. + * @param minHint Hint for minimum number of chunks per column in created Frame. + * @param exactHint Hint for exact number of chunks per column in created Frame. + * @return Created DRM. + */ + public static H2ODrm emptyDrm(long nrow, int ncol, int minHint, int exactHint) { + return new H2ODrm(emptyFrame(nrow, ncol, minHint, exactHint)); } } http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2OBCast.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2OBCast.java b/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2OBCast.java index 7395027..fed5da2 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2OBCast.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2OBCast.java @@ -31,28 +31,44 @@ import java.io.ByteArrayInputStream; import java.io.ObjectOutputStream; import java.io.ObjectInputStream; -/* Handle Matrix and Vector separately so that we can live with - just importing MatrixWritable and VectorWritable. -*/ - +/** + * Broadcast class wrapper around Matrix and Vector. + * + * Use MatrixWritable and VectorWritable internally. + * Even though the class is generically typed, we do runtime + * enforcement to assert the type is either Matrix or Vector. + * + * H2OBCast object is created around a Matrix or Vector. Matrix or Vector + * objects cannot be freely referred in closures. Instead create and refer the + * corresponding H2OBCast object. The original Matrix or Vector can be + * obtained by calling the ->value() method on the H2OBCast object within a + * closure. + */ public class H2OBCast<T> implements BCast<T>, Serializable { - transient T obj; - byte buf[]; - boolean is_matrix; + private transient T obj; + private byte buf[]; + private boolean isMatrix; + /** + * Class constructor. + */ public H2OBCast(T o) { obj = o; if (o instanceof Matrix) { buf = serialize(new MatrixWritable((Matrix)o)); - is_matrix = true; + isMatrix = true; } else if (o instanceof Vector) { buf = serialize(new VectorWritable((Vector)o)); + isMatrix = false; } else { throw new IllegalArgumentException("Only Matrix or Vector supported for now"); } } + /** + * Get the serialized object. + */ public T value() { if (obj == null) { obj = deserialize(buf); @@ -60,6 +76,13 @@ public class H2OBCast<T> implements BCast<T>, Serializable { return obj; } + /** + * Internal method to serialize the object. + * + * @param w Either MatrixWritable or VectorWritable corresponding to + * either Matrix or Vector as the class is typed. + * @return serialized sequence of bytes. + */ private byte[] serialize(Writable w) { ByteArrayOutputStream bos = new ByteArrayOutputStream(); try { @@ -72,12 +95,18 @@ public class H2OBCast<T> implements BCast<T>, Serializable { return bos.toByteArray(); } + /** + * Internal method to deserialize a sequence of bytes. + * + * @param buf Sequence of bytes previously serialized by serialize() method. + * @return The original Matrix or Vector object. + */ private T deserialize(byte buf[]) { T ret = null; ByteArrayInputStream bis = new ByteArrayInputStream(buf); try { ObjectInputStream ois = new ObjectInputStream(bis); - if (is_matrix) { + if (isMatrix) { MatrixWritable w = new MatrixWritable(); w.readFields(ois); ret = (T) w.get(); http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2ODrm.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2ODrm.java b/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2ODrm.java index 058ea0a..7288dff 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2ODrm.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2ODrm.java @@ -20,15 +20,26 @@ package org.apache.mahout.h2obindings.drm; import water.fvec.Frame; import water.fvec.Vec; +/** + * Class which represents a Mahout DRM in H2O. + */ public class H2ODrm { + /** frame stores all the numerical data of a DRM. */ public Frame frame; + /** keys stores the row key bindings (String or Long) */ public Vec keys; + /** + * Class constructor. Null key represents Int keyed DRM. + */ public H2ODrm(Frame m) { frame = m; keys = null; } + /** + * Class constructor. Both Numerical and row key bindings specified. + */ public H2ODrm(Frame m, Vec k) { frame = m; keys = k; http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/ops/ABt.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/ABt.java b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/ABt.java index 1e91ace..c713e27 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/ABt.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/ABt.java @@ -25,27 +25,35 @@ import water.fvec.Vec; import water.fvec.Chunk; import water.fvec.NewChunk; +/** + * Calculate AB' + */ public class ABt { - /* Calculate AB' */ + /** + * Calculate AB' on two DRMs to create a new DRM holding the result. + * + * @param drmA DRM representing matrix A + * @param drmB DRM representing matrix B + * @return new DRM containing AB' + */ public static H2ODrm exec(H2ODrm drmA, H2ODrm drmB) { Frame A = drmA.frame; Vec keys = drmA.keys; final Frame B = drmB.frame; int ABt_cols = (int)B.numRows(); - /* ABt is written into ncs[] with an MRTask on A, and therefore will - be similarly partitioned as A. - - chks.length == A.numCols() (== B.numCols()) - ncs.length == ABt_cols (B.numRows()) - */ + // ABt is written into ncs[] with an MRTask on A, and therefore will + // be similarly partitioned as A. + // + // chks.length == A.numCols() (== B.numCols()) + // ncs.length == ABt_cols (B.numRows()) Frame ABt = new MRTask() { public void map(Chunk chks[], NewChunk ncs[]) { - int chunk_size = chks[0].len(); + int chunkSize = chks[0].len(); Vec B_vecs[] = B.vecs(); for (int c = 0; c < ncs.length; c++) { - for (int r = 0; r < chunk_size; r++) { + for (int r = 0; r < chunkSize; r++) { double v = 0; for (int i = 0; i < chks.length; i++) { v += (chks[i].at0(r) * B_vecs[i].at(c)); @@ -56,7 +64,7 @@ public class ABt { } }.doAll(ABt_cols, A).outputFrame(null, null); - /* Carry forward labels of A blindly into ABt */ + // Carry forward labels of A blindly into ABt return new H2ODrm(ABt, keys); } } http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AewB.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AewB.java b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AewB.java index 20d6a6d..ed4e6eb 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AewB.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AewB.java @@ -25,20 +25,29 @@ import water.fvec.Vec; import water.fvec.Chunk; import water.fvec.NewChunk; +/** + * Element-wise DRM-DRM operations + */ public class AewB { - /* Element-wise DRM-DRM operations */ + /** + * Perform element-wise operation on two DRMs to create a new DRM. + * + * @param drmA DRM representing matrix A. + * @param drmB DRM representing matrix B. + * @param op Element-wise operator encoded as a String. + * @return new DRM containing A (element-wise) B. + */ public static H2ODrm exec(H2ODrm drmA, H2ODrm drmB, final String op) { final Frame A = drmA.frame; final Frame B = drmB.frame; Vec keys = drmA.keys; int AewB_cols = A.numCols(); - /* AewB is written into ncs[] with an MRTask on A, and therefore will - be similarly partitioned as A. - - B may or may not be similarly partitioned as A, but must have the - same dimensions of A. - */ + // AewB is written into ncs[] with an MRTask on A, and therefore will + // be similarly partitioned as A. + // + // B may or may not be similarly partitioned as A, but must have the + // same dimensions of A. Frame AewB = new MRTask() { private double opfn(String op, double a, double b) { if (a == 0.0 && b == 0.0) { @@ -55,20 +64,21 @@ public class AewB { } return 0.0; } + @Override public void map(Chunk chks[], NewChunk ncs[]) { - int chunk_size = chks[0].len(); + int chunkSize = chks[0].len(); Vec B_vecs[] = B.vecs(); long start = chks[0].start(); for (int c = 0; c < chks.length; c++) { - for (int r = 0; r < chunk_size; r++) { + for (int r = 0; r < chunkSize; r++) { ncs[c].addNum(opfn(op, chks[c].at0(r), B_vecs[c].at(start + r))); } } } }.doAll(AewB_cols, A).outputFrame(null, null); - /* Carry forward labels of A blindly into ABt */ + // Carry forward labels of A blindly into ABt return new H2ODrm(AewB, keys); } } http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AewScalar.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AewScalar.java b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AewScalar.java index 41182d7..0680169 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AewScalar.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AewScalar.java @@ -25,16 +25,25 @@ import water.fvec.Vec; import water.fvec.Chunk; import water.fvec.NewChunk; +/** + * Element-wise DRM-Scalar operations + */ public class AewScalar { - /* Element-wise DRM-DRM operations */ + /** + * Perform element-wise operation on a DRM with a Scalar to create a new DRM. + * + * @param drmA DRM representing matrix A. + * @param s Scalar value represented as a double. + * @param op Element-wise operator encoded as a String. + * @return new DRM containing A (element-wise) B. + */ public static H2ODrm exec(H2ODrm drmA, final double s, final String op) { Frame A = drmA.frame; Vec keys = drmA.keys; int AewScalar_cols = A.numCols(); - /* AewScalar is written into ncs[] with an MRTask on A, and therefore will - be similarly partitioned as A. - */ + // AewScalar is written into ncs[] with an MRTask on A, and therefore will + // be similarly partitioned as A. Frame AewScalar = new MRTask() { private double opfn(String op, double a, double b) { if (a == 0.0 && b == 0.0) { @@ -52,18 +61,18 @@ public class AewScalar { return 0.0; } public void map(Chunk chks[], NewChunk ncs[]) { - int chunk_size = chks[0].len(); + int chunkSize = chks[0].len(); long start = chks[0].start(); for (int c = 0; c < chks.length; c++) { - for (int r = 0; r < chunk_size; r++) { + for (int r = 0; r < chunkSize; r++) { ncs[c].addNum(opfn(op, chks[c].at0(r), s)); } } } }.doAll(AewScalar_cols, A).outputFrame(null, null); - /* Carry forward labels of A blindly into ABt */ + // Carry forward labels of A blindly into ABt return new H2ODrm(AewScalar, keys); } } http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/ops/At.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/At.java b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/At.java index 4e8d17d..e3ee36a 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/At.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/At.java @@ -25,33 +25,39 @@ import water.fvec.Frame; import water.fvec.Vec; import water.fvec.Chunk; +/** + * Calculate A' (transpose) + */ public class At { - /* Calculate A' (transpose) */ + /** + * Perform transpose operation on a DRM to create a new DRM. + * + * @param drmA DRM representing matrix A. + * @return new DRM containing A'. + */ public static H2ODrm exec(H2ODrm drmA) { final Frame A = drmA.frame; - /* First create a new frame of the required dimensions, A.numCols() rows - and A.numRows() columns. - */ + // First create a new frame of the required dimensions, A.numCols() rows + // and A.numRows() columns. Frame At = H2OHelper.emptyFrame(A.numCols(), (int) A.numRows(), -1, -1); - /* Execute MRTask on the new frame, and fill each cell (initially 0) by - pulling in the appropriate value from A. - */ + // Execute MRTask on the new frame, and fill each cell (initially 0) by + // pulling in the appropriate value from A. new MRTask() { public void map(Chunk chks[]) { - int chunk_size = chks[0].len(); + int chunkSize = chks[0].len(); long start = chks[0].start(); Vec A_vecs[] = A.vecs(); for (int c = 0; c < chks.length; c++) { - for (int r = 0; r < chunk_size; r++) { + for (int r = 0; r < chunkSize; r++) { chks[c].set0(r, A_vecs[(int)(start + r)].at(c)); } } } }.doAll(At); - /* At is NOT similarly partitioned as A, drop labels */ + // At is NOT similarly partitioned as A, drop labels return new H2ODrm(At); } } http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AtA.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AtA.java b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AtA.java index 6ffcfc0..818837e 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AtA.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AtA.java @@ -25,27 +25,34 @@ import water.fvec.Frame; import water.fvec.Vec; import water.fvec.Chunk; +/** + * Calculate A'A + */ public class AtA { - /* Calculate A'A */ - public static H2ODrm exec(H2ODrm drmA) { + /** + * Perform A'A operation on a DRM to create a new DRM. + * + * @param drmA DRM representing matrix A. + * @return new DRM containing A'A. + */ + public static H2ODrm exec(H2ODrm drmA) { final Frame A = drmA.frame; - /* First create an empty Frame of the required dimensions */ + // First create an empty Frame of the required dimensions Frame AtA = H2OHelper.emptyFrame(A.numCols(), A.numCols(), -1, -1); - /* Execute MRTask on the new Frame, and fill each cell (initially 0) by - computing appropriate values from A. - - chks.length == A.numCols() - */ + // Execute MRTask on the new Frame, and fill each cell (initially 0) by + // computing appropriate values from A. + // + // chks.length == A.numCols() new MRTask() { public void map(Chunk chks[]) { - int chunk_size = chks[0].len(); + int chunkSize = chks[0].len(); long start = chks[0].start(); Vec A_vecs[] = A.vecs(); long A_rows = A.numRows(); for (int c = 0; c < chks.length; c++) { - for (int r = 0; r < chunk_size; r++) { + for (int r = 0; r < chunkSize; r++) { double v = 0; for (long i = 0; i < A_rows; i++) { v += (A_vecs[(int)(start + r)].at(i) * A_vecs[c].at(i)); @@ -56,7 +63,7 @@ public class AtA { } }.doAll(AtA); - /* AtA is NOT similarly partitioned as A, drop labels */ + // AtA is NOT similarly partitioned as A, drop labels return new H2ODrm(AtA); } } http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AtB.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AtB.java b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AtB.java index 72de4d1..1c4275e 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AtB.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/AtB.java @@ -25,30 +25,38 @@ import water.fvec.Frame; import water.fvec.Vec; import water.fvec.Chunk; +/** + * Calculate A'B + */ public class AtB { - /* Calculate A'B */ + /** + * Perform A'B operation on two DRMs to create a new DRM. + * + * @param drmA DRM representing matrix A. + * @param drmB DRM representing matrix B. + * @return new DRM containing A'B. + */ public static H2ODrm exec(H2ODrm drmA, H2ODrm drmB) { final Frame A = drmA.frame; final Frame B = drmB.frame; - /* First create an empty frame of the required dimensions */ + // First create an empty frame of the required dimensions Frame AtB = H2OHelper.emptyFrame(A.numCols(), B.numCols(), -1, -1); - /* Execute MRTask on the new Frame, and fill each cell (initially 0) by - computing appropriate values from A and B. - - chks.length == B.numCols() - */ + // Execute MRTask on the new Frame, and fill each cell (initially 0) by + // computing appropriate values from A and B. + // + // chks.length == B.numCols() new MRTask() { public void map(Chunk chks[]) { - int chunk_size = chks[0].len(); + int chunkSize = chks[0].len(); long start = chks[0].start(); long A_rows = A.numRows(); Vec A_vecs[] = A.vecs(); Vec B_vecs[] = B.vecs(); for (int c = 0; c < chks.length; c++) { - for (int r = 0; r < chunk_size; r++) { + for (int r = 0; r < chunkSize; r++) { double v = 0; for (long i = 0; i < A_rows; i++) { v += (A_vecs[(int)(start + r)].at(i) * B_vecs[c].at(i)); @@ -59,7 +67,7 @@ public class AtB { } }.doAll(AtB); - /* AtB is NOT similarly partitioned as A, drop labels */ + // AtB is NOT similarly partitioned as A, drop labels return new H2ODrm(AtB); } } http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Atx.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Atx.java b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Atx.java index 306d1ac..fb954df 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Atx.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Atx.java @@ -30,27 +30,35 @@ import water.fvec.Frame; import water.fvec.Chunk; import water.util.ArrayUtils; +/** + * Calculate A'x (where x is an in-core Vector) + */ public class Atx { - /* Calculate A'x (where x is an in-core Vector) */ + /** + * Perform A'x operation with a DRM and an in-core Vector to create a new DRM. + * + * @param drmA DRM representing matrix A. + * @param x in-core Mahout Vector. + * @return new DRM containing A'x. + */ public static H2ODrm exec(H2ODrm drmA, Vector x) { Frame A = drmA.frame; final H2OBCast<Vector> bx = new H2OBCast<Vector>(x); - /* A'x is computed into atx[] with an MRTask on A (with - x available as a Broadcast - - x.size() == A.numRows() - atx.length == chks.length == A.numCols() - */ + // A'x is computed into atx[] with an MRTask on A (with + // x available as a Broadcast + // + // x.size() == A.numRows() + // atx.length == chks.length == A.numCols() class MRTaskAtx extends MRTask<MRTaskAtx> { double atx[]; public void map(Chunk chks[]) { - int chunk_size = chks[0].len(); + int chunkSize = chks[0].len(); Vector x = bx.value(); long start = chks[0].start(); atx = new double[chks.length]; - for (int r = 0; r < chunk_size; r++) { + for (int r = 0; r < chunkSize; r++) { double d = x.getQuick((int)start + r); for (int c = 0; c < chks.length; c++) { atx[c] += (chks[c].at0(r) * d); @@ -62,10 +70,9 @@ public class Atx { } } - /* Take the result in .atx[], and convert into a Frame - using existing helper functions (creating a Matrix - along the way for the Helper) - */ + // Take the result in .atx[], and convert into a Frame + // using existing helper functions (creating a Matrix + // along the way for the Helper) Vector v = new DenseVector(new MRTaskAtx().doAll(A).atx); Matrix m = new DenseMatrix(A.numCols(), 1); m.assignColumn(0, v); http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Ax.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Ax.java b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Ax.java index c93d487..548b39a 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Ax.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Ax.java @@ -28,24 +28,32 @@ import water.fvec.Vec; import water.fvec.Chunk; import water.fvec.NewChunk; +/** + * Calculate Ax (where x is an in-core Vector) + */ public class Ax { - /* Calculate Ax (where x is an in-core Vector) */ + /** + * Perform Ax operation with a DRM and an in-core Vector to create a new DRM. + * + * @param drmA DRM representing matrix A. + * @param x in-core Mahout Vector. + * @return new DRM containing Ax. + */ public static H2ODrm exec(H2ODrm drmA, Vector x) { Frame A = drmA.frame; Vec keys = drmA.keys; final H2OBCast<Vector> bx = new H2OBCast<Vector>(x); - /* Ax is written into nc (single element, not array) with an MRTask on A, - and therefore will be similarly partitioned as A. - - x.size() == A.numCols() == chks.length - */ + // Ax is written into nc (single element, not array) with an MRTask on A, + // and therefore will be similarly partitioned as A. + // + // x.size() == A.numCols() == chks.length Frame Ax = new MRTask() { public void map(Chunk chks[], NewChunk nc) { - int chunk_size = chks[0].len(); + int chunkSize = chks[0].len(); Vector x = bx.value(); - for (int r = 0; r < chunk_size; r++) { + for (int r = 0; r < chunkSize; r++) { double v = 0; for (int c = 0; c < chks.length; c++) { v += (chks[c].at0(r) * x.getQuick(c)); @@ -55,7 +63,7 @@ public class Ax { } }.doAll(1, A).outputFrame(null, null); - /* Carry forward labels of A blindly into ABt */ + // Carry forward labels of A blindly into ABt return new H2ODrm(Ax, keys); } } http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Cbind.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Cbind.java b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Cbind.java index cb8a0b9..8656ca5 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Cbind.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Cbind.java @@ -24,64 +24,71 @@ import water.fvec.Chunk; import org.apache.mahout.h2obindings.drm.H2ODrm; +/** + * R-like cbind like operator, on two DRMs + */ public class Cbind { - /* R's cbind like operator, on DrmA and DrmB */ + /** + * Combine the columns of two DRMs A and B to create a new DRM. + * + * @param drmA DRM representing matrix A. + * @param drmB DRM representing matrix B. + * @return new DRM containing columns of A and B adjacent. + */ public static H2ODrm exec(H2ODrm drmA, H2ODrm drmB) { Frame fra = drmA.frame; Vec keysa = drmA.keys; Frame frb = drmB.frame; Vec keysb = drmB.keys; - /* If A and B are similarly partitioned, .. */ + // If A and B are similarly partitioned, .. if (fra.anyVec().group() == frb.anyVec().group()) { - /* .. then, do a light weight zip() */ + // .. then, do a light weight zip() return zip(fra, keysa, frb, keysb); } else { - /* .. else, do a heavy weight join() which involves moving data over the wire */ + // .. else, do a heavy weight join() which involves moving data over the wire return join(fra, keysa, frb, keysb); } } - /* Light weight zip(), no data movement */ + /** Light weight zip(), no data movement */ private static H2ODrm zip(final Frame fra, final Vec keysa, final Frame frb, final Vec keysb) { - /* Create a new Vec[] to hold the concatenated list of A and B's column vectors */ + // Create a new Vec[] to hold the concatenated list of A and B's column vectors Vec vecs[] = new Vec[fra.vecs().length + frb.vecs().length]; int d = 0; - /* fill A's column vectors */ + // fill A's column vectors for (Vec vfra : fra.vecs()) { vecs[d++] = vfra; } - /* and B's */ + // and B's for (Vec vfrb : frb.vecs()) { vecs[d++] = vfrb; } - /* and create a new Frame with the combined list of column Vecs */ + // and create a new Frame with the combined list of column Vecs Frame fr = new Frame(vecs); /* Finally, inherit A's string labels into the result */ return new H2ODrm(fr, keysa); } - /* heavy weight join(), involves moving data */ + /** Heavy weight join(), involves moving data */ private static H2ODrm join(final Frame fra, final Vec keysa, final Frame frb, final Vec keysb) { - - /* The plan is to re-organize B to be "similarly partitioned as A", and then zip() */ + // The plan is to re-organize B to be "similarly partitioned as A", and then zip() Vec bvecs[] = new Vec[frb.vecs().length]; for (int i = 0; i < bvecs.length; i++) { - /* First create column Vecs which are similarly partitioned as A */ + // First create column Vecs which are similarly partitioned as A bvecs[i] = fra.anyVec().makeZero(); } - /* Next run an MRTask on the new vectors, and fill each cell (initially 0) - by pulling in appropriate values from B (frb) - */ + // Next run an MRTask on the new vectors, and fill each cell (initially 0) + // by pulling in appropriate values from B (frb) new MRTask() { public void map(Chunk chks[]) { - int chunk_size = chks[0].len(); + int chunkSize = chks[0].len(); long start = chks[0].start(); Vec vecs[] = frb.vecs(); - for (int r = 0; r < chunk_size; r++) { + for (int r = 0; r < chunkSize; r++) { for (int c = 0; c < chks.length; c++) { // assert va.atStr(start+r) == vb.atStr(start+r) chks[c].set0(r, vecs[c].at(start + r)); @@ -90,7 +97,7 @@ public class Cbind { } }.doAll(bvecs); - /* now that bvecs[] is compatible, just zip'em'up */ + // now that bvecs[] is compatible, just zip'em'up return zip(fra, keysa, new Frame(bvecs), null); } } http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/ops/MapBlock.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/MapBlock.java b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/MapBlock.java index 0f901e4..5a50f03 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/MapBlock.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/MapBlock.java @@ -32,32 +32,51 @@ import java.util.Arrays; import scala.reflect.ClassTag; +/** + * MapBlock operator. + */ public class MapBlock { - public static <K,R> H2ODrm exec(H2ODrm drmA, int ncol, Object bmf, final boolean is_r_str, + /** + * Execute a BlockMapFunction on DRM partitions to create a new DRM. + * + * @param drmA DRM representing matrix A. + * @param ncol Number of columns output by BMF. + * @param bmf BlockMapFunction which maps input DRM partition to output. + * @param isRstr flag indicating if key type of output DRM is a String. + * @param k ClassTag of intput DRM key type. + * @param r ClassTag of output DRM key type. + * @return new DRM constructed from mapped blocks of drmA through bmf. + */ + public static <K,R> H2ODrm exec(H2ODrm drmA, int ncol, Object bmf, final boolean isRstr, final ClassTag<K> k, final ClassTag<R> r) { Frame A = drmA.frame; Vec keys = drmA.keys; + /** + * MRTask to execute bmf on partitions. Partitions are + * made accessible to bmf in the form of H2OBlockMatrix. + */ class MRTaskBMF extends MRTask<MRTaskBMF> { Serializable bmf; Vec labels; MRTaskBMF(Object _bmf, Vec _labels) { - /* BlockMapFun does not implement Serializable, - but Scala closures are _always_ Serializable. - - So receive the object as a plain Object (else - compilation fails) and typcast it with conviction, - that Scala always tags the actually generated - closure functions with Serializable. - */ + // BlockMapFun does not implement Serializable, + // but Scala closures are _always_ Serializable. + // + // So receive the object as a plain Object (else + // compilation fails) and typcast it with conviction, + // that Scala always tags the actually generated + // closure functions with Serializable. bmf = (Serializable)_bmf; labels = _labels; } + /** Create H2OBlockMatrix from the partition */ private Matrix blockify(Chunk chks[]) { return new H2OBlockMatrix(chks); } + /** Ingest the output of bmf into the output partition */ private void deblockify(Matrix out, NewChunk ncs[]) { // assert (out.colSize() == ncs.length) for (int c = 0; c < out.columnSize(); c++) { @@ -67,36 +86,34 @@ public class MapBlock { } } - /* - Input: - chks.length == A.numCols() - - Output: - ncs.length == (A.numCols() + 1) if String keyed - (A.numCols() + 0) if Int or Long keyed - - First A.numCols() ncs[] elements are fed back the output - of bmf() output's _2 in deblockify() - - If String keyed, then MapBlockHelper.exec() would have - filled in the Strings into ncs[ncol] already - */ + // Input: + // chks.length == A.numCols() + // + // Output: + // ncs.length == (A.numCols() + 1) if String keyed + // (A.numCols() + 0) if Int or Long keyed + // + // First A.numCols() ncs[] elements are fed back the output + // of bmf() output's _2 in deblockify() + // + // If String keyed, then MapBlockHelper.exec() would have + // filled in the Strings into ncs[ncol] already + // public void map(Chunk chks[], NewChunk ncs[]) { long start = chks[0].start(); - NewChunk nclabel = is_r_str ? ncs[ncs.length - 1] : null; + NewChunk nclabel = isRstr ? ncs[ncs.length - 1] : null; deblockify(MapBlockHelper.exec(bmf, blockify(chks), start, labels, nclabel, k, r), ncs); // assert chks[i]._len == ncs[j]._len } } - int ncol_res = ncol + (is_r_str ? 1 : 0); - Frame fmap = new MRTaskBMF(bmf, keys).doAll(ncol_res, A).outputFrame(null, null); + int ncolRes = ncol + (isRstr ? 1 : 0); + Frame fmap = new MRTaskBMF(bmf, keys).doAll(ncolRes, A).outputFrame(null, null); Vec vmap = null; - if (is_r_str) { - /* If output was String keyed, then the last Vec in fmap is the String vec. - If so, peel it out into a separate Vec (vmap) and set fmap to be the - Frame with just the first ncol Vecs - */ + if (isRstr) { + // If output was String keyed, then the last Vec in fmap is the String vec. + // If so, peel it out into a separate Vec (vmap) and set fmap to be the + // Frame with just the first ncol Vecs vmap = fmap.vecs()[ncol]; fmap = new Frame(Arrays.copyOfRange(fmap.vecs(), 0, ncol)); } http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Par.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Par.java b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Par.java index 036bfc6..54db5a1 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Par.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Par.java @@ -27,32 +27,42 @@ import water.parser.ValueString; import org.apache.mahout.h2obindings.H2OHelper; import org.apache.mahout.h2obindings.drm.H2ODrm; +/** + * Parallelize operator. + */ public class Par { + /** + * (re)Parallelize DRM data according to new partitioning hints. + * + * @param drmA Input DRM containing data. + * @param min Hint of minimum number of partitions to parallelize, if not -1. + * @param exact Hint of exact number of partitions to parallelize, if not -1. + * @return new DRM holding the same data but parallelized according to new hints. + */ public static H2ODrm exec(H2ODrm drmA, int min, int exact) { final Frame frin = drmA.frame; final Vec vin = drmA.keys; - /* First create a new empty Frame with the required partitioning */ + // First create a new empty Frame with the required partitioning Frame frout = H2OHelper.emptyFrame(frin.numRows(), frin.numCols(), min, exact); Vec vout = null; if (vin != null) { - /* If String keyed, then run an MRTask on the new frame, and also - creat yet another 1-column newer frame for the re-orged String keys. - The new String Vec will therefore be similarly partitioned as the - new Frame. - - vout is finally collected by calling anyVec() on outputFrame(), - as it is the only column in the output frame. - */ + // If String keyed, then run an MRTask on the new frame, and also + // creat yet another 1-column newer frame for the re-orged String keys. + // The new String Vec will therefore be similarly partitioned as the + // new Frame. + // + // vout is finally collected by calling anyVec() on outputFrame(), + // as it is the only column in the output frame. vout = new MRTask() { public void map(Chunk chks[], NewChunk nc) { - int chunk_size = chks[0].len(); + int chunkSize = chks[0].len(); Vec vins[] = frin.vecs(); long start = chks[0].start(); ValueString vstr = new ValueString(); - for (int r = 0; r < chunk_size; r++) { + for (int r = 0; r < chunkSize; r++) { for (int c = 0; c < chks.length; c++) { chks[c].set0(r, vins[c].at(start + r)); } @@ -61,16 +71,15 @@ public class Par { } }.doAll(1, frout).outputFrame(null, null).anyVec(); } else { - /* If not String keyed, then run and MRTask on the new frame, and - just pull in right elements from frin - */ + // If not String keyed, then run and MRTask on the new frame, and + // just pull in right elements from frin new MRTask() { public void map(Chunk chks[]) { - int chunk_size = chks[0].len(); + int chunkSize = chks[0].len(); Vec vins[] = frin.vecs(); long start = chks[0].start(); - for (int r = 0; r < chunk_size; r++) { + for (int r = 0; r < chunkSize; r++) { for (int c = 0; c < chks.length; c++) { chks[c].set0(r, vins[c].at(start + r)); } http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Rbind.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Rbind.java b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Rbind.java index 4ee5fc9..57eb560 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Rbind.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/Rbind.java @@ -27,18 +27,26 @@ import water.parser.ValueString; import org.apache.mahout.h2obindings.H2OHelper; import org.apache.mahout.h2obindings.drm.H2ODrm; +/** + * R-like rbind like operator, on two DRMs + */ public class Rbind { - /* R's rbind like operator, on DrmA and DrmB */ + /** + * Combine the rows of two DRMs A and B to create a new DRM. + * + * @param drmA DRM representing matrix A. + * @param drmB DRM representing matrix B. + * @return new DRM containing rows of B below A. + */ public static H2ODrm exec(H2ODrm drmA, H2ODrm drmB) { final Frame fra = drmA.frame; final Vec keysa = drmA.keys; final Frame frb = drmB.frame; final Vec keysb = drmB.keys; - /* Create new frame and copy A's data at the top, and B's data below. - Create the frame in the same VectorGroup as A, so A's data does not - cross the wire during copy. B's data could potentially cross the wire. - */ + // Create new frame and copy A's data at the top, and B's data below. + // Create the frame in the same VectorGroup as A, so A's data does not + // cross the wire during copy. B's data could potentially cross the wire. Frame frbind = H2OHelper.emptyFrame(fra.numRows() + frb.numRows(), fra.numCols(), -1, -1, fra.anyVec().group()); Vec keys = null; @@ -50,10 +58,10 @@ public class Rbind { long A_rows = fra.numRows(); long B_rows = frb.numRows(); long start = chks[0].start(); - int chunk_size = chks[0].len(); + int chunkSize = chks[0].len(); ValueString vstr = new ValueString(); - for (int r = 0; r < chunk_size; r++) { + for (int r = 0; r < chunkSize; r++) { for (int c = 0; c < chks.length; c++) { if (r + start < A_rows) { chks[c].set0(r, A_vecs[c].at(r + start)); http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/ops/RowRange.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/RowRange.java b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/RowRange.java index 9af5feb..e6bb778 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/RowRange.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/RowRange.java @@ -28,28 +28,36 @@ import water.parser.ValueString; import org.apache.mahout.h2obindings.drm.H2ODrm; +/** + * Filter operation + */ public class RowRange { - /* Filter operation */ + /** + * Filter rows from intput DRM, to include only row indiced included in R. + * + * @param drmA Input DRM. + * @param R Range object specifying the start and end row numbers to filter. + * @return new DRM with just the filtered rows. + */ public static H2ODrm exec(H2ODrm drmA, final Range R) { Frame A = drmA.frame; Vec keys = drmA.keys; - /* Run a filtering MRTask on A. If row number falls within R.start() and - R.end(), then the row makes it into the output - */ + // Run a filtering MRTask on A. If row number falls within R.start() and + // R.end(), then the row makes it into the output Frame Arr = new MRTask() { public void map(Chunk chks[], NewChunk ncs[]) { - int chunk_size = chks[0].len(); - long chunk_start = chks[0].start(); + int chunkSize = chks[0].len(); + long chunkStart = chks[0].start(); - /* First check if the entire chunk even overlaps with R */ - if (chunk_start > R.end() || (chunk_start + chunk_size) < R.start()) { + // First check if the entire chunk even overlaps with R + if (chunkStart > R.end() || (chunkStart + chunkSize) < R.start()) { return; } - /* This chunk overlaps, filter out just the overlapping rows */ - for (int r = 0; r < chunk_size; r++) { - if (!R.contains(chunk_start + r)) { + // This chunk overlaps, filter out just the overlapping rows + for (int r = 0; r < chunkSize; r++) { + if (!R.contains(chunkStart + r)) { continue; } @@ -61,20 +69,19 @@ public class RowRange { }.doAll(A.numCols(), A).outputFrame(null, null); Vec Vrr = (keys == null) ? null : new MRTask() { - /* This is a String keyed DRM. Do the same thing as above, - but this time just one column of Strings. - */ + // This is a String keyed DRM. Do the same thing as above, + // but this time just one column of Strings. public void map(Chunk chk, NewChunk nc) { - int chunk_size = chk.len(); - long chunk_start = chk.start(); + int chunkSize = chk.len(); + long chunkStart = chk.start(); ValueString vstr = new ValueString(); - if (chunk_start > R.end() || (chunk_start + chunk_size) < R.start()) { + if (chunkStart > R.end() || (chunkStart + chunkSize) < R.start()) { return; } - for (int r = 0; r < chunk_size; r++) { - if (!R.contains(chunk_start + r)) { + for (int r = 0; r < chunkSize; r++) { + if (!R.contains(chunkStart + r)) { continue; } http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/java/org/apache/mahout/h2obindings/ops/TimesRightMatrix.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/TimesRightMatrix.java b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/TimesRightMatrix.java index ab185d8..6d96586 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/ops/TimesRightMatrix.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/ops/TimesRightMatrix.java @@ -29,8 +29,17 @@ import water.fvec.Vec; import water.fvec.Chunk; import water.fvec.NewChunk; +/** + * Multiple DRM with in-core Matrix + */ public class TimesRightMatrix { - /* Multiple with in-core Matrix */ + /** + * Multiply a DRM with an in-core Matrix to create a new DRM. + * + * @param drmA DRM representing matrix A. + * @param B in-core Mahout Matrix. + * @return new DRM containing drmA times B. + */ public static H2ODrm exec(H2ODrm drmA, Matrix B) { Frame A = drmA.frame; Vec keys = drmA.keys; @@ -45,21 +54,21 @@ public class TimesRightMatrix { return new H2ODrm(AinCoreB, keys); } - /* - Multiply Frame A with in-core diagonal Matrix (whose diagonal Vector is d) - - A.numCols() == d.size() - */ + /** + * Multiply Frame A with in-core diagonal Matrix (whose diagonal Vector is d) + * + * A.numCols() == d.size() + */ private static Frame execDiagonal(final Frame A, Vector d) { final H2OBCast<Vector> bd = new H2OBCast<Vector>(d); return new MRTask() { public void map(Chunk chks[], NewChunk ncs[]) { Vector D = bd.value(); - int chunk_size = chks[0].len(); + int chunkSize = chks[0].len(); for (int c = 0; c < ncs.length; c++) { - for (int r = 0; r < chunk_size; r++) { + for (int r = 0; r < chunkSize; r++) { double v = (chks[c].at0(r) * D.getQuick(c)); ncs[c].addNum(v); } @@ -68,21 +77,21 @@ public class TimesRightMatrix { }.doAll(d.size(), A).outputFrame(null, null); } - /* - Multiply Frame A with in-core Matrix b - - A.numCols() == b.rowSize() - */ + /** + * Multiply Frame A with in-core Matrix b + * + * A.numCols() == b.rowSize() + */ private static Frame execCommon(final Frame A, Matrix b) { final H2OBCast<Matrix> bb = new H2OBCast<Matrix>(b); return new MRTask() { public void map(Chunk chks[], NewChunk ncs[]) { Matrix B = bb.value(); - int chunk_size = chks[0].len(); + int chunkSize = chks[0].len(); for (int c = 0; c < ncs.length; c++) { - for (int r = 0; r < chunk_size; r++) { + for (int r = 0; r < chunkSize; r++) { double v = 0; for (int i = 0; i < chks.length; i++) { v += (chks[i].at0(r) * B.getQuick(i, c)); http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala index a8485af..54d950b 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala @@ -24,6 +24,7 @@ import org.apache.mahout.math.drm.logical._ import org.apache.mahout.h2obindings.ops._ import org.apache.mahout.h2obindings.drm._ +/** H2O specific non-DRM operations */ object H2OEngine extends DistributedEngine { def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector = H2OHelper.colMeans(drm.h2odrm.frame) @@ -37,36 +38,49 @@ object H2OEngine extends DistributedEngine { def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = H2OHelper.nonZeroCnt(drm.h2odrm.frame) + /** Broadcast support */ def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = new H2OBCast(m) + /** Broadcast support */ def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] = new H2OBCast(v) + /** + * Load DRM from hdfs (as in Mahout DRM format) + * + * @param path Path to DRM file + * @param parMin Hint of minimum number of partitions to split while distributing + * + * @return DRM[Any] where Any is automatically translated to value type + */ def drmFromHDFS(path: String, parMin: Int = 0)(implicit dc: DistributedContext): CheckpointedDrm[_] = new CheckpointedDrmH2O(H2OHdfs.drmFromFile(path, parMin), dc) + /** This creates an empty DRM with specified number of partitions and cardinality. */ def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int)(implicit dc: DistributedContext): CheckpointedDrm[Int] = new CheckpointedDrmH2O[Int](H2OHelper.emptyDrm(nrow, ncol, numPartitions, -1), dc) def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int)(implicit dc: DistributedContext): CheckpointedDrm[Long] = new CheckpointedDrmH2O[Long](H2OHelper.emptyDrm(nrow, ncol, numPartitions, -1), dc) + /** Parallelize in-core matrix as H2O distributed matrix, using row ordinal indices as data set keys. */ def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int)(implicit dc: DistributedContext): CheckpointedDrm[Int] = new CheckpointedDrmH2O[Int](H2OHelper.drmFromMatrix(m, numPartitions, -1), dc) + /** Parallelize in-core matrix as H2O distributed matrix, using row labels as a data set keys. */ def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int)(implicit dc: DistributedContext): CheckpointedDrm[String] = new CheckpointedDrmH2O[String](H2OHelper.drmFromMatrix(m, numPartitions, -1), dc) def toPhysical[K:ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = new CheckpointedDrmH2O[K](tr2phys(plan), plan.context) - // H2O specific - + /** Eagerly evaluate operator graph into an H2O DRM */ private def tr2phys[K: ClassTag](oper: DrmLike[K]): H2ODrm = { oper match { case OpAtAnyKey(_) => throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.") + // Linear algebra operators case op@OpAt(a) => At.exec(tr2phys(a)(op.classTagA)) case op@OpABt(a, b) => ABt.exec(tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) case op@OpAtB(a, b) => AtB.exec(tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) @@ -74,13 +88,13 @@ object H2OEngine extends DistributedEngine { case op@OpAx(a, v) => Ax.exec(tr2phys(a)(op.classTagA), v) case op@OpAtx(a, v) => Atx.exec(tr2phys(a)(op.classTagA), v) case op@OpAewB(a, b, opId) => AewB.exec(tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB), opId) + case op@OpAewScalar(a, s, opId) => AewScalar.exec(tr2phys(a)(op.classTagA), s, opId) + case op@OpTimesRightMatrix(a, m) => TimesRightMatrix.exec(tr2phys(a)(op.classTagA), m) // Non arithmetic case op@OpCbind(a, b) => Cbind.exec(tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) case op@OpRbind(a, b) => Rbind.exec(tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) - case op@OpAewScalar(a, s, opId) => AewScalar.exec(tr2phys(a)(op.classTagA), s, opId) case op@OpRowRange(a, r) => RowRange.exec(tr2phys(a)(op.classTagA), r) - case op@OpTimesRightMatrix(a, m) => TimesRightMatrix.exec(tr2phys(a)(op.classTagA), m) - // Custom operators, we just execute them + // Custom operators case blockOp: OpMapBlock[K, _] => MapBlock.exec(tr2phys(blockOp.A)(blockOp.classTagA), blockOp.ncol, blockOp.bmf, (blockOp.classTagK == implicitly[ClassTag[String]]), blockOp.classTagA, blockOp.classTagK) case op@OpPar(a, m, e) => Par.exec(tr2phys(a)(op.classTagA), m, e) http://git-wip-us.apache.org/repos/asf/mahout/blob/2d1b0bf6/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala index 58a9a79..15af5e2 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala @@ -8,18 +8,40 @@ import org.apache.mahout.h2obindings._ import scala.reflect._ -/** H2O-specific optimizer-checkpointed DRM. */ +/** + * H2O-specific optimizer-checkpointed DRM. + * + * @param h2odrm Underlying Frame and optional label Vec to wrap around + * @param context Distributed context to the H2O Cloud + * @tparam K Matrix key type + */ class CheckpointedDrmH2O[K: ClassTag]( val h2odrm: H2ODrm, val context: DistributedContext ) extends CheckpointedDrm[K] { + /** + * Collecting DRM to in-core Matrix + * + * If key in DRM is Int, then matrix is collected using key as row index. + * Otherwise, order of rows in result is undefined but key.toString is applied + * as rowLabelBindings of the in-core matrix. + */ def collect: Matrix = H2OHelper.matrixFromDrm(h2odrm) + + /* XXX: call frame.remove */ def uncache(): this.type = this + /** + * Persist DRM to on-disk over HDFS in Mahout DRM format. + */ def writeDRM(path: String): Unit = H2OHdfs.drmToFile(path, h2odrm) + /** + * Action operator - Eagerly evaluate the lazily built operator graph to create + * a CheckpointedDrm + */ def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = this def ncol: Int = h2odrm.frame.numCols
