MAHOUT-1736: Implement allreduceBlock() on H2O This closes apache/mahout#143
Signed-off-by: Anand Avati <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/31ec0197 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/31ec0197 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/31ec0197 Branch: refs/heads/mahout-0.10.x Commit: 31ec01973fe6a3bcdfd453042471142000cdd36d Parents: 60571af Author: Anand Avati <[email protected]> Authored: Tue Jun 23 20:23:53 2015 -0700 Committer: Anand Avati <[email protected]> Committed: Wed Jun 24 11:29:19 2015 -0700 ---------------------------------------------------------------------- .../apache/mahout/h2obindings/H2OHelper.java | 32 ++++++++++++++++++++ .../apache/mahout/h2obindings/H2OEngine.scala | 2 +- 2 files changed, 33 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/31ec0197/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHelper.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHelper.java b/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHelper.java index 0fae5a8..c9d91f9 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHelper.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/H2OHelper.java @@ -33,14 +33,19 @@ import water.util.ArrayUtils; import java.util.Map; import java.util.HashMap; +import java.io.Serializable; import org.apache.mahout.h2obindings.drm.H2ODrm; +import org.apache.mahout.h2obindings.drm.H2OBCast; // for makeEmptyStrVec import water.Key; import water.DKV; import water.fvec.CStrChunk; +import scala.Function1; +import scala.Function2; + /** * Collection of helper methods for H2O backend. */ @@ -437,4 +442,31 @@ public class H2OHelper { public static H2ODrm emptyDrm(long nrow, int ncol, int minHint, int exactHint) { return new H2ODrm(emptyFrame(nrow, ncol, minHint, exactHint)); } + + public static Matrix allreduceBlock(H2ODrm drmA, Object bmfn, Object rfn) { + class MRTaskMR extends MRTask<MRTaskMR> { + H2OBCast<Matrix> bmf_out; + Serializable bmf; + Serializable rf; + + public MRTaskMR(Object _bmf, Object _rf) { + bmf = (Serializable) _bmf; + rf = (Serializable) _rf; + } + + @Override + public void map(Chunk chks[]) { + Function1 f = (Function1) bmf; + bmf_out = new H2OBCast((Matrix)f.apply(new scala.Tuple2(null, new H2OBlockMatrix(chks)))); + } + + @Override + public void reduce(MRTaskMR that) { + Function2 f = (Function2) rf; + bmf_out = new H2OBCast((Matrix)f.apply(this.bmf_out.value(), that.bmf_out.value())); + } + } + + return new MRTaskMR(bmfn, rfn).doAll(drmA.frame).bmf_out.value(); + } } http://git-wip-us.apache.org/repos/asf/mahout/blob/31ec0197/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala index 4236b95..bcf3507 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala @@ -168,7 +168,7 @@ object H2OEngine extends DistributedEngine { * */ override def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc) - : Matrix = ??? + : Matrix = H2OHelper.allreduceBlock(drm.h2odrm, bmf, rf) /** * TODO: implement this please.
