MAHOUT-1660 MAHOUT-1713 MAHOUT-1714 MAHOUT-1715 MAHOUT-1716 MAHOUT-1717 
MAHOUT-1718 MAHOUT-1719 MAHOUT-1720 MAHOUT-1721 MAHOUT-1722 MAHOUT-1723 
MAHOUT-1724 MAHOUT-1725 MAHOUT-1726 MAHOUT-1727 MAHOUT-1728 MAHOUT-1729 
MAHOUT-1730 MAHOUT-1731 MAHOUT-1732
Cumulative patch for the above issues. Closes apache/mahout#135

Squashed commit of the following:

commit c59bf8a21e1ad77dee80730772d2184b3f28a495
Author: Dmitriy Lyubimov <[email protected]>
Date:   Mon Jun 8 18:11:57 2015 -0700

    handling degenerate matrix cases for rbind, cbind, and serialization (0 
columns or rows)

commit 56b735e137355e174facffd409d6456360c2f8e7
Author: Dmitriy Lyubimov <[email protected]>
Date:   Mon Jun 8 16:58:34 2015 -0700

    Inserting back the testing framework artifact being built. Need this as a 
dependency
    in subordinate projects that do method testing as well.

commit 7e6ce766d06c5a2337dd9b08df7c9fa37bd9a9c8
Author: Dmitriy Lyubimov <[email protected]>
Date:   Mon Jun 8 10:22:53 2015 -0700

    adding "final" for logger per comment on public PR

commit e42bcedf8521b89c7583f8e7e299c2be0c2a8de2
Author: Dmitriy Lyubimov <[email protected]>
Date:   Tue Jun 2 12:24:30 2015 -0700

    final fixes in h20.
    fixing @deprecated warnings in atb

commit 00fb618ad0ef0e5b8aac30c88b23d2e9325ea8f8
Author: Dmitriy Lyubimov <[email protected]>
Date:   Tue Jun 2 12:08:13 2015 -0700

    h20 stuff

commit f4e15506ed2497bc2e179e3ded9ca399fd826d15
Author: Dmitriy Lyubimov <[email protected]>
Date:   Tue Jun 2 11:55:30 2015 -0700

    restoring merge errors in h2o module, nothing is touched here.

commit 1b892de589bccf03c41c6b2e49493472e6bd1d52
Author: Dmitriy Lyubimov <[email protected]>
Date:   Mon Jun 1 18:44:21 2015 -0700

    Picking up missing changes on both sides in spark module.
    TODO: Pat's similarity driver tests fail, seems, on some degenerate 
splitting in optimizer. Need to take a look.

commit 3422046b94c03d43a91f091e38532339cf890351
Author: Dmitriy Lyubimov <[email protected]>
Date:   Mon Jun 1 18:13:03 2015 -0700

    Adding missing change. uncommenting performance in-core tests.

commit 7aa5de5431ad01c37e8069956287194c97b37b06
Author: Dmitriy Lyubimov <[email protected]>
Date:   Mon Jun 1 17:54:17 2015 -0700

    Initial merge with ora private-review branch. Stuff compiles up to h2o 
(which needs to be added some unimplemented stuff) and ssvd tests
    are failing in math-scala module due to lack of matrix flavor on mmul. They 
are not failing in private branch though -- some changes still
    have not been merged?

    Most changes i care about seems to be there though.


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/8a6b805a
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/8a6b805a
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/8a6b805a

Branch: refs/heads/mahout-0.10.x
Commit: 8a6b805a3c15080f28be050a83b1ad26a60f21e6
Parents: e6d24b9
Author: Dmitriy Lyubimov <[email protected]>
Authored: Wed Jun 10 17:08:37 2015 -0700
Committer: Dmitriy Lyubimov <[email protected]>
Committed: Wed Jun 10 17:08:37 2015 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  41 +++
 bin/mahout                                      |   4 +-
 .../apache/mahout/h2obindings/drm/H2OBCast.java |  12 +
 .../apache/mahout/h2obindings/H2OEngine.scala   |  66 ++--
 .../org/apache/mahout/logging/package.scala     |  73 ++++
 .../apache/mahout/math/decompositions/DQR.scala |   9 +-
 .../mahout/math/decompositions/DSSVD.scala      |  19 +-
 .../mahout/math/decompositions/SSVD.scala       |   2 +-
 .../org/apache/mahout/math/drm/BCast.scala      |   3 +-
 .../mahout/math/drm/CheckpointedOps.scala       |   7 +
 .../mahout/math/drm/DistributedEngine.scala     | 125 ++++---
 .../mahout/math/drm/DrmDoubleScalarOps.scala    |   8 +-
 .../org/apache/mahout/math/drm/DrmLikeOps.scala |   7 +-
 .../apache/mahout/math/drm/RLikeDrmOps.scala    |  55 ++-
 .../math/drm/logical/AbstractUnaryOp.scala      |   2 +-
 .../math/drm/logical/CheckpointAction.scala     |   3 +-
 .../mahout/math/drm/logical/OpAewScalar.scala   |   6 +-
 .../math/drm/logical/OpAewUnaryFunc.scala       |  47 +++
 .../math/drm/logical/OpAewUnaryFuncFusion.scala |  62 ++++
 .../mahout/math/drm/logical/OpCbind.scala       |   2 +-
 .../mahout/math/drm/logical/OpCbindScalar.scala |  37 ++
 .../mahout/math/drm/logical/OpMapBlock.scala    |   2 +-
 .../mahout/math/drm/logical/TEwFunc.scala       |  37 ++
 .../org/apache/mahout/math/drm/package.scala    |  50 ++-
 .../math/scalabindings/DoubleScalarOps.scala    |  42 ---
 .../apache/mahout/math/scalabindings/MMul.scala | 295 ++++++++++++++++
 .../mahout/math/scalabindings/MatrixOps.scala   |  87 ++++-
 .../scalabindings/RLikeDoubleScalarOps.scala    |  63 ++++
 .../math/scalabindings/RLikeMatrixOps.scala     |  77 ++++-
 .../mahout/math/scalabindings/RLikeOps.scala    |   4 +-
 .../math/scalabindings/RLikeTimesOps.scala      |  28 --
 .../math/scalabindings/RLikeVectorOps.scala     |  29 +-
 .../mahout/math/scalabindings/VectorOps.scala   |  45 ++-
 .../mahout/math/scalabindings/package.scala     |  81 ++++-
 .../org/apache/mahout/util/IOUtilsScala.scala   |  64 ++++
 .../mahout/math/drm/DrmLikeOpsSuiteBase.scala   |  20 ++
 .../mahout/math/drm/DrmLikeSuiteBase.scala      |   3 +-
 .../mahout/math/drm/RLikeDrmOpsSuiteBase.scala  |  94 ++++-
 .../math/scalabindings/MatrixOpsSuite.scala     |  33 +-
 .../scalabindings/RLikeMatrixOpsSuite.scala     | 276 +++++++++++++++
 .../math/scalabindings/VectorOpsSuite.scala     |  19 +-
 .../org/apache/mahout/math/AbstractMatrix.java  |  24 +-
 .../org/apache/mahout/math/ConstantVector.java  |   5 +
 .../apache/mahout/math/DelegatingVector.java    |   5 +
 .../org/apache/mahout/math/DenseMatrix.java     |   9 +-
 .../mahout/math/DenseSymmetricMatrix.java       |   2 +
 .../org/apache/mahout/math/DenseVector.java     |   5 +
 .../org/apache/mahout/math/DiagonalMatrix.java  |  14 +
 .../math/FileBasedSparseBinaryMatrix.java       |   5 +
 .../mahout/math/FunctionalMatrixView.java       |   9 +
 .../java/org/apache/mahout/math/Matrices.java   |  18 +-
 .../java/org/apache/mahout/math/Matrix.java     |   7 +
 .../apache/mahout/math/MatrixVectorView.java    |   5 +
 .../java/org/apache/mahout/math/MatrixView.java |   6 +
 .../org/apache/mahout/math/NamedVector.java     |   5 +
 .../apache/mahout/math/PermutedVectorView.java  |   5 +
 .../mahout/math/RandomAccessSparseVector.java   |   5 +
 .../math/SequentialAccessSparseVector.java      |   7 +
 .../apache/mahout/math/SparseColumnMatrix.java  |  20 +-
 .../org/apache/mahout/math/SparseMatrix.java    |  30 +-
 .../org/apache/mahout/math/SparseRowMatrix.java |   7 +
 .../mahout/math/TransposedMatrixView.java       | 147 ++++++++
 .../org/apache/mahout/math/UpperTriangular.java |   9 +
 .../java/org/apache/mahout/math/Vector.java     |   8 +
 .../org/apache/mahout/math/VectorIterable.java  |   4 +
 .../java/org/apache/mahout/math/VectorView.java |   7 +-
 .../org/apache/mahout/math/flavor/BackEnum.java |  26 ++
 .../apache/mahout/math/flavor/MatrixFlavor.java |  82 +++++
 .../math/flavor/TraversingStructureEnum.java    |  48 +++
 .../org/apache/mahout/math/MatricesTest.java    |   4 +-
 .../math/hadoop/DistributedRowMatrix.java       |   5 +
 .../stochasticsvd/qr/GivensThinSolver.java      |   5 +
 .../sparkbindings/shell/MahoutSparkILoop.scala  |  15 +-
 spark/pom.xml                                   |  16 +
 .../org/apache/mahout/common/DrmMetadata.scala  |  17 +
 .../org/apache/mahout/common/HDFSUtil.scala     |   4 +-
 .../apache/mahout/common/Hadoop1HDFSUtil.scala  |  18 +-
 .../mahout/sparkbindings/SparkEngine.scala      | 227 +++++++++----
 .../apache/mahout/sparkbindings/blas/ABt.scala  | 200 +++++++++--
 .../apache/mahout/sparkbindings/blas/AewB.scala |  75 +++-
 .../mahout/sparkbindings/blas/AinCoreB.scala    |  16 +-
 .../apache/mahout/sparkbindings/blas/At.scala   |  15 +-
 .../apache/mahout/sparkbindings/blas/AtA.scala  | 247 +++++++++-----
 .../apache/mahout/sparkbindings/blas/AtB.scala  | 339 ++++++++++++++++---
 .../apache/mahout/sparkbindings/blas/Ax.scala   |  24 +-
 .../mahout/sparkbindings/blas/CbindAB.scala     |  35 +-
 .../mahout/sparkbindings/blas/DrmRddOps.scala   |   2 +
 .../mahout/sparkbindings/blas/MapBlock.scala    |  15 +-
 .../apache/mahout/sparkbindings/blas/Par.scala  |  74 ++--
 .../mahout/sparkbindings/blas/RbindAB.scala     |   8 +-
 .../mahout/sparkbindings/blas/Slicing.scala     |   2 +-
 .../mahout/sparkbindings/blas/package.scala     | 174 +++++++++-
 .../drm/CheckpointedDrmSpark.scala              |  41 ++-
 .../drm/CheckpointedDrmSparkOps.scala           |   2 +-
 .../mahout/sparkbindings/drm/DrmRddInput.scala  |  18 +-
 .../mahout/sparkbindings/drm/SparkBCast.scala   |   2 +
 .../mahout/sparkbindings/drm/package.scala      |  23 +-
 .../io/GenericMatrixKryoSerializer.scala        | 189 +++++++++++
 .../io/MahoutKryoRegistrator.scala              |  28 +-
 .../io/UnsupportedSerializer.scala              |  31 ++
 .../sparkbindings/io/VectorKryoSerializer.scala | 252 ++++++++++++++
 .../apache/mahout/sparkbindings/package.scala   | 118 +++----
 .../sparkbindings/SparkBindingsSuite.scala      |  12 +-
 .../mahout/sparkbindings/blas/BlasSuite.scala   |   4 +-
 .../sparkbindings/drm/DrmLikeOpsSuite.scala     |  17 +-
 .../sparkbindings/drm/RLikeDrmOpsSuite.scala    |  63 ++++
 .../mahout/sparkbindings/io/IOSuite.scala       | 195 +++++++++++
 .../test/DistributedSparkSuite.scala            |  15 +-
 .../test/LoggerConfiguration.scala              |   2 +-
 109 files changed, 4310 insertions(+), 702 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 89c2cbc..dd65b0e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,47 @@ Mahout Change Log
 
 Release 0.10.2 - unreleased
 
+  MAHOUT-1660: Hadoop1HDFSUtil.readDRMHEader should be taking Hadoop conf 
(dlyubimov)
+
+  MAHOUT-1713: Performance and parallelization improvements for AB', A'B, A'A 
spark physical operators (dlyubimov)
+
+  MAHOUT-1714: Add MAHOUT_OPTS environment when running Spark shell (dlyubimov)
+
+  MAHOUT-1715: Closeable API for broadcast tensors (dlyubimov)
+
+  MAHOUT-1716: Scala logging style (dlyubimov)
+
+  MAHOUT-1717: allreduceBlock() operator api and Spark implementation 
(dlyubimov)
+
+  MAHOUT-1718: Support for conversion of any type-keyed DRM into 
ordinally-keyed DRM (dlyubimov)
+
+  MAHOUT-1719: Unary elementwise function operator and function fusions 
(dlyubimov)
+
+  MAHOUT-1720: Support 1 cbind X, X cbind 1 etc. for both Matrix and DRM 
(dlyubimov)
+
+  MAHOUT-1721: rowSumsMap() summary for non-int-keyed DRMs (dlyubimov)
+
+  MAHOUT-1722: DRM row sampling api (dlyubimov)
+
+  MAHOUT-1723: Optional structural "flavor" abstraction for in-core matrices 
(dlyubimov)
+
+  MAHOUT-1724: Optimizations of matrix-matrix in-core multiplication based on 
structural flavors (dlyubimov)
+
+  MAHOUT-1725: elementwise power operator ^ (dlyubimov)
+
+  MAHOUT-1726: R-like vector concatenation operator (dlyubimov)
+
+  MAHOUT-1727: Elementwise analogues of scala.math functions for tensor types 
(dlyubimov)
+
+  MAHOUT-1728: In-core functional assignments (dlyubimov)
+
+  MAHOUT-1729: Straighten out behavior of Matrix.iterator() and 
iterateNonEmpty() (dlyubimov)
+
+  MAHOUT-1730: New mutable transposition view for in-core matrices (dlyubimov)
+
+  MAHOUT-1731: Deprecate SparseColumnMatrix (dlyubimov)
+
+  MAHOUT-1732: Native support for kryo serialization of tensor types 
(dlyubimov)
 
 Release 0.10.1 - 2015-05-31
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/bin/mahout
----------------------------------------------------------------------
diff --git a/bin/mahout b/bin/mahout
index ee0b918..24f01ba 100755
--- a/bin/mahout
+++ b/bin/mahout
@@ -254,12 +254,10 @@ fi
 # restore ordinary behaviour
 unset IFS
 
-
-
 case "$1" in
   (spark-shell)
     save_stty=$(stty -g 2>/dev/null);
-    "$JAVA" $JAVA_HEAP_MAX -classpath "$CLASSPATH" 
"org.apache.mahout.sparkbindings.shell.Main" $@
+    "$JAVA" $JAVA_HEAP_MAX $MAHOUT_OPTS -classpath "$CLASSPATH" 
"org.apache.mahout.sparkbindings.shell.Main" $@
     stty sane; stty $save_stty
     ;;
   # Spark CLI drivers go here

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2OBCast.java
----------------------------------------------------------------------
diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2OBCast.java 
b/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2OBCast.java
index 523a771..ebcc626 100644
--- a/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2OBCast.java
+++ b/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2OBCast.java
@@ -118,4 +118,16 @@ public class H2OBCast<T> implements BCast<T>, Serializable 
{
     }
     return ret;
   }
+
+  /**
+   * Stop broadcasting when called on driver side. Release any network 
resources.
+   *
+   */
+  @Override
+  public void close() throws IOException {
+
+    // TODO: review this. It looks like it is not really a broadcast mechanism 
but rather just a
+    // serialization wrapper. In which case it doesn't hold any network 
resources.
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala 
b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
index 173d5a0..e0ac302 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
@@ -26,9 +26,13 @@ import org.apache.mahout.math.drm.logical._
 import org.apache.mahout.h2obindings.ops._
 import org.apache.mahout.h2obindings.drm._
 import org.apache.mahout.h2o.common.{Hadoop1HDFSUtil, HDFSUtil}
+import org.apache.mahout.logging._
 
 /** H2O specific non-DRM operations */
 object H2OEngine extends DistributedEngine {
+
+  private final implicit val log = getLog(H2OEngine.getClass)
+
   // By default, use Hadoop 1 utils
   var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
 
@@ -119,40 +123,64 @@ object H2OEngine extends DistributedEngine {
   abstract class IndexedDatasetH2O(val matrix: CheckpointedDrm[Int], val 
rowIDs: BiDictionary, val columnIDs: BiDictionary)
     extends IndexedDataset {}
 
-    /**
-   * reads an IndexedDatasetH2O from default text delimited files
+  /**
+   * Reads an IndexedDatasetH2O from default text delimited files
    * @todo unimplemented
    * @param src a comma separated list of URIs to read from
    * @param schema how the text file is formatted
    * @return
    */
   def indexedDatasetDFSRead(src: String,
-      schema: Schema = DefaultIndexedDatasetReadSchema,
-      existingRowIDs: Option[BiDictionary] = None)
-      (implicit sc: DistributedContext):
-    IndexedDatasetH2O = {
-    // should log a warning when this is built but no logger here, can an H2O 
contributor help with this
-    println("Warning: unimplemented indexedDatasetDFSReadElements." )
-    throw new UnsupportedOperationException("IndexedDatasetH2O is not 
implemented so can't be read.")
-    null.asInstanceOf[IndexedDatasetH2O]
+                            schema: Schema = DefaultIndexedDatasetReadSchema,
+                            existingRowIDs: Option[BiDictionary] = None)
+                           (implicit sc: DistributedContext):
+  IndexedDatasetH2O = {
+
+    error("Unimplemented indexedDatasetDFSReadElements.")
+
+    ???
   }
 
   /**
-   * reads an IndexedDatasetH2O from default text delimited files
+   * Reads an IndexedDatasetH2O from default text delimited files
    * @todo unimplemented
    * @param src a comma separated list of URIs to read from
    * @param schema how the text file is formatted
    * @return
    */
   def indexedDatasetDFSReadElements(src: String,
-      schema: Schema = DefaultIndexedDatasetReadSchema,
-      existingRowIDs: Option[BiDictionary] = None)
-      (implicit sc: DistributedContext):
-    IndexedDatasetH2O = {
-    // should log a warning when this is built but no logger here, can an H2O 
contributor help with this
-    println("Warning: unimplemented indexedDatasetDFSReadElements." )
-    throw new UnsupportedOperationException("IndexedDatasetH2O is not 
implemented so can't be read by elements.")
-    null.asInstanceOf[IndexedDatasetH2O]
+                                    schema: Schema = 
DefaultIndexedDatasetReadSchema,
+                                    existingRowIDs: Option[BiDictionary] = 
None)
+                                   (implicit sc: DistributedContext): 
IndexedDatasetH2O = {
+
+    error("Unimplemented indexedDatasetDFSReadElements.")
+
+    ???
   }
 
+  /**
+   * Optional engine-specific all reduce tensor operation.
+   *
+   * TODO: implement this please.
+   *
+   */
+  override def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: 
BlockMapFunc2[K], rf: BlockReduceFunc)
+  : Matrix = ???
+
+  /**
+   * TODO: implement this please.
+   */
+  override def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples: Int, 
replacement: Boolean): Matrix = ???
+
+  /**
+   * (Optional) Sampling operation. Consistent with Spark semantics of the 
same.
+   * TODO: implement this please.
+   */
+  override def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, 
replacement: Boolean): DrmLike[K] = ???
+
+  /**
+   * TODO: implement this please.
+   */
+  override def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean)
+  : (DrmLike[Int], Option[DrmLike[K]]) = ???
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/logging/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/logging/package.scala 
b/math-scala/src/main/scala/org/apache/mahout/logging/package.scala
new file mode 100644
index 0000000..15aa909
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/logging/package.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout
+
+import org.apache.log4j.{Level, Priority, Logger}
+
+package object logging {
+
+  /** Compute `expr` if debug is on, only */
+  def debugDo[T](expr: => T)(implicit log: Logger): Option[T] = {
+    if (log.isDebugEnabled) Some(expr)
+    else None
+  }
+
+  /** Compute `expr` if trace is on, only */
+  def traceDo[T](expr: => T)(implicit log: Logger): Option[T] = {
+    if (log.isTraceEnabled) Some(expr) else None
+  }
+
+  /** Shorter, and lazy, versions of logging methods. Just declare log 
implicit. */
+  def debug(msg: => AnyRef)(implicit log: Logger) { if (log.isDebugEnabled) 
log.debug(msg) }
+
+  def debug(msg: => AnyRef, t: Throwable)(implicit log: Logger) { if 
(log.isDebugEnabled()) log.debug(msg, t) }
+
+  /** Shorter, and lazy, versions of logging methods. Just declare log 
implicit. */
+  def trace(msg: => AnyRef)(implicit log: Logger) { if (log.isTraceEnabled) 
log.trace(msg) }
+
+  def trace(msg: => AnyRef, t: Throwable)(implicit log: Logger) { if 
(log.isTraceEnabled()) log.trace(msg, t) }
+
+  def info(msg: => AnyRef)(implicit log: Logger) { if (log.isInfoEnabled) 
log.info(msg)}
+
+  def info(msg: => AnyRef, t:Throwable)(implicit log: Logger) { if 
(log.isInfoEnabled) log.info(msg,t)}
+
+  def warn(msg: => AnyRef)(implicit log: Logger) { if 
(log.isEnabledFor(Level.WARN)) log.warn(msg) }
+
+  def warn(msg: => AnyRef, t: Throwable)(implicit log: Logger) { if 
(log.isEnabledFor(Level.WARN)) error(msg, t) }
+
+  def error(msg: => AnyRef)(implicit log: Logger) { if 
(log.isEnabledFor(Level.ERROR)) log.warn(msg) }
+
+  def error(msg: => AnyRef, t: Throwable)(implicit log: Logger) { if 
(log.isEnabledFor(Level.ERROR)) error(msg, t) }
+
+  def fatal(msg: => AnyRef)(implicit log: Logger) { if 
(log.isEnabledFor(Level.FATAL)) log.fatal(msg) }
+
+  def fatal(msg: => AnyRef, t: Throwable)(implicit log: Logger) { if 
(log.isEnabledFor(Level.FATAL)) log.fatal(msg, t) }
+
+  def getLog(name: String): Logger = Logger.getLogger(name)
+
+  def getLog(clazz: Class[_]): Logger = Logger.getLogger(clazz)
+
+  def mahoutLog :Logger = getLog("org.apache.mahout")
+
+  def setLogLevel(l:Level)(implicit log:Logger) = {
+    log.setLevel(l)
+  }
+
+  def setAdditivity(a:Boolean)(implicit log:Logger) = log.setAdditivity(a)
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
index 7caa3dd..866ee34 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
@@ -18,6 +18,7 @@
 package org.apache.mahout.math.decompositions
 
 import scala.reflect.ClassTag
+import org.apache.mahout.logging._
 import org.apache.mahout.math.Matrix
 import org.apache.mahout.math.scalabindings._
 import RLikeOps._
@@ -27,7 +28,7 @@ import org.apache.log4j.Logger
 
 object DQR {
 
-  private val log = Logger.getLogger(DQR.getClass)
+  private final implicit val log = getLog(DQR.getClass)
 
   /**
    * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then 
n should be pretty
@@ -41,19 +42,19 @@ object DQR {
   def dqrThin[K: ClassTag](drmA: DrmLike[K], checkRankDeficiency: Boolean = 
true): (DrmLike[K], Matrix) = {
 
     if (drmA.ncol > 5000)
-      log.warn("A is too fat. A'A must fit in memory and easily broadcasted.")
+      warn("A is too fat. A'A must fit in memory and easily broadcasted.")
 
     implicit val ctx = drmA.context
 
     val AtA = (drmA.t %*% drmA).checkpoint()
     val inCoreAtA = AtA.collect
 
-    if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA))
+    trace("A'A=\n%s\n".format(inCoreAtA))
 
     val ch = chol(inCoreAtA)
     val inCoreR = (ch.getL cloned) t
 
-    if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR))
+    trace("R=\n%s\n".format(inCoreR))
 
     if (checkRankDeficiency && !ch.isPositiveDefinite)
       throw new IllegalArgumentException("R is rank-deficient.")

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
index 1abfb87..cecaec8 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
@@ -7,9 +7,12 @@ import RLikeOps._
 import org.apache.mahout.math.drm._
 import RLikeDrmOps._
 import org.apache.mahout.common.RandomUtils
+import org.apache.mahout.logging._
 
 object DSSVD {
 
+  private final implicit val log = getLog(DSSVD.getClass)
+
   /**
    * Distributed Stochastic Singular Value decomposition algorithm.
    *
@@ -43,18 +46,22 @@ object DSSVD {
       case (keys, blockA) =>
         val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
         keys -> blockY
-    }
+    }.checkpoint()
 
-    var drmQ = dqrThin(drmY.checkpoint())._1
+    var drmQ = dqrThin(drmY)._1
     // Checkpoint Q if last iteration
     if (q == 0) drmQ = drmQ.checkpoint()
 
+    trace(s"dssvd:drmQ=${drmQ.collect}.")
+
     // This actually should be optimized as identically partitioned map-side 
A'B since A and Q should
     // still be identically partitioned.
     var drmBt = drmAcp.t %*% drmQ
     // Checkpoint B' if last iteration
     if (q == 0) drmBt = drmBt.checkpoint()
 
+    trace(s"dssvd:drmB'=${drmBt.collect}.")
+
     for (i <- 0  until q) {
       drmY = drmAcp %*% drmBt
       drmQ = dqrThin(drmY.checkpoint())._1
@@ -62,13 +69,17 @@ object DSSVD {
       if (i == q - 1) drmQ = drmQ.checkpoint()
 
       // This on the other hand should be inner-join-and-map A'B optimization 
since A and Q_i are not
-      // identically partitioned anymore.
+      // identically partitioned anymore.`
       drmBt = drmAcp.t %*% drmQ
       // Checkpoint B' if last iteration
       if (i == q - 1) drmBt = drmBt.checkpoint()
     }
 
-    val (inCoreUHat, d) = eigen(drmBt.t %*% drmBt)
+    val mxBBt:Matrix = drmBt.t %*% drmBt
+
+    trace(s"dssvd: BB'=$mxBBt.")
+
+    val (inCoreUHat, d) = eigen(mxBBt)
     val s = d.sqrt
 
     // Since neither drmU nor drmV are actually computed until actually used, 
we don't need the flags

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala
index 80385a3..e1b2f03 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala
@@ -150,7 +150,7 @@ private[math] object SSVD {
     val c = s_q cross s_b
 
     // BB' computation becomes
-    val bbt = bt.t %*% bt -c - c.t +  (s_q cross s_q) * (xi dot xi)
+    val bbt = bt.t %*% bt - c - c.t +  (s_q cross s_q) * (xi dot xi)
 
     val (uhat, d) = eigen(bbt)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
index 850614457..b86e286 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
@@ -18,6 +18,7 @@
 package org.apache.mahout.math.drm
 
 /** Broadcast variable abstraction */
-trait BCast[T] {
+trait BCast[T] extends java.io.Closeable {
   def value:T
+
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
index 8c3911f..c43c6c7 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
@@ -20,6 +20,7 @@ package org.apache.mahout.math.drm
 import scala.reflect.ClassTag
 import org.apache.mahout.math._
 
+import org.apache.mahout.math.scalabindings.RLikeOps._
 
 /**
  * Additional experimental operations over CheckpointedDRM implementation. I 
will possibly move them up to
@@ -38,6 +39,12 @@ class CheckpointedOps[K: ClassTag](val drm: 
CheckpointedDrm[K]) {
   /** Column Means */
   def colMeans(): Vector = drm.context.colMeans(drm)
 
+  /** Optional engine-specific all reduce tensor operation. */
+  def allreduceBlock(bmf: BlockMapFunc2[K], rf: BlockReduceFunc = _ += _): 
Matrix =
+
+    drm.context.allreduceBlock(drm, bmf, rf)
+
+
   def norm():Double = drm.context.norm(drm)
 }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index bb6772a..519a127 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -19,16 +19,15 @@ package org.apache.mahout.math.drm
 
 import org.apache.mahout.math.indexeddataset._
 
-import scala.reflect.ClassTag
 import logical._
 import org.apache.mahout.math._
 import scalabindings._
 import RLikeOps._
-import RLikeDrmOps._
 import DistributedEngine._
-import org.apache.mahout.math.scalabindings._
 import org.apache.log4j.Logger
 
+import scala.reflect.ClassTag
+
 /** Abstraction of optimizer/distributed engine */
 trait DistributedEngine {
 
@@ -37,7 +36,7 @@ trait DistributedEngine {
    * introduce logical constructs (including engine-specific ones) that user 
DSL cannot even produce
    * per se.
    * <P>
-   *   
+   *
    * A particular physical engine implementation may choose to either use the 
default rewrites or
    * build its own rewriting rules.
    * <P>
@@ -50,6 +49,9 @@ trait DistributedEngine {
   /** Engine-specific colSums implementation based on a checkpoint. */
   def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector
 
+  /** Optional engine-specific all reduce tensor operation. */
+  def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: 
BlockMapFunc2[K], rf: BlockReduceFunc): Matrix
+
   /** Engine-specific numNonZeroElementsPerColumn implementation based on a 
checkpoint. */
   def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector
 
@@ -73,20 +75,39 @@ trait DistributedEngine {
   def drmDfsRead(path: String, parMin: Int = 0)(implicit sc: 
DistributedContext): CheckpointedDrm[_]
 
   /** Parallelize in-core matrix as spark distributed matrix, using row 
ordinal indices as data set keys. */
-  def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
-      (implicit sc: DistributedContext): CheckpointedDrm[Int]
+  def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)(implicit 
sc: DistributedContext):
+  CheckpointedDrm[Int]
 
   /** Parallelize in-core matrix as spark distributed matrix, using row labels 
as a data set keys. */
-  def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
-      (implicit sc: DistributedContext): CheckpointedDrm[String]
+  def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)(implicit 
sc: DistributedContext):
+  CheckpointedDrm[String]
 
   /** This creates an empty DRM with specified number of partitions and 
cardinality. */
-  def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
-      (implicit sc: DistributedContext): CheckpointedDrm[Int]
+  def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 
10)(implicit sc: DistributedContext):
+  CheckpointedDrm[Int]
 
   /** Creates empty DRM with non-trivial height */
-  def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
-      (implicit sc: DistributedContext): CheckpointedDrm[Long]
+  def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 
10)(implicit sc: DistributedContext):
+  CheckpointedDrm[Long]
+
+  /**
+   * Convert non-int-keyed matrix to an int-keyed, computing optionally 
mapping from old keys
+   * to row indices in the new one. The mapping, if requested, is returned as 
a 1-column matrix.
+   */
+  def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = 
false): (DrmLike[Int], Option[DrmLike[K]])
+
+  /**
+   * (Optional) Sampling operation. Consistent with Spark semantics of the 
same.
+   * @param drmX
+   * @param fraction
+   * @param replacement
+   * @tparam K
+   * @return
+   */
+  def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, 
replacement: Boolean = false): DrmLike[K]
+
+  def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, 
replacement:Boolean = false) : Matrix
+
   /**
    * Load IndexedDataset from text delimited format.
    * @param src comma delimited URIs to read from
@@ -119,38 +140,49 @@ object DistributedEngine {
   private def pass1[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
 
     action match {
-      case OpAB(OpAt(a), b) if (a == b) => OpAtA(pass1(a))
-      case OpABAnyKey(OpAtAnyKey(a), b) if (a == b) => OpAtA(pass1(a))
+
+      // self element-wise rewrite
+      case OpAewB(a, b, op) if (a == b) => {
+        op match {
+          case "*" ⇒ OpAewUnaryFunc(pass1(a), (x) ⇒ x * x)
+          case "/" ⇒ OpAewUnaryFunc(pass1(a), (x) ⇒ x / x)
+          // Self "+" and "-" don't make a lot of sense, but we do include it 
for completeness.
+          case "+" ⇒ OpAewUnaryFunc(pass1(a), 2.0 * _)
+          case "-" ⇒ OpAewUnaryFunc(pass1(a), (_) ⇒ 0.0)
+          case _ ⇒
+          require(false, s"Unsupported operator $op")
+            null
+        }
+      }
+      case OpAB(OpAt(a), b) if (a == b) ⇒ OpAtA(pass1(a))
+      case OpABAnyKey(OpAtAnyKey(a), b) if (a == b) ⇒ OpAtA(pass1(a))
 
       // For now, rewrite left-multiply via transpositions, i.e.
       // inCoreA %*% B = (B' %*% inCoreA')'
-      case op@OpTimesLeftMatrix(a, b) =>
-        OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t))
+      case op@OpTimesLeftMatrix(a, b) ⇒
+      OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t))
 
       // Add vertical row index concatenation for rbind() on DrmLike[Int] 
fragments
-      case op@OpRbind(a, b) if (implicitly[ClassTag[K]] == ClassTag.Int) =>
+      case op@OpRbind(a, b) if (implicitly[ClassTag[K]] == ClassTag.Int) ⇒
 
         // Make sure closure sees only local vals, not attributes. We need to 
do these ugly casts
         // around because compiler could not infer that K is the same as Int, 
based on if() above.
         val ma = safeToNonNegInt(a.nrow)
-        val bAdjusted = new OpMapBlock[Int, Int](
-          A = pass1(b.asInstanceOf[DrmLike[Int]]),
-          bmf = {
-            case (keys, block) => keys.map(_ + ma) -> block
-          },
-          identicallyPartitioned = false
-        )
+        val bAdjusted = new OpMapBlock[Int, Int](A = 
pass1(b.asInstanceOf[DrmLike[Int]]), bmf = {
+          case (keys, block) ⇒ keys.map(_ + ma) → block
+        }, identicallyPartitioned = false)
         val aAdjusted = a.asInstanceOf[DrmLike[Int]]
         OpRbind(pass1(aAdjusted), bAdjusted).asInstanceOf[DrmLike[K]]
 
       // Stop at checkpoints
-      case cd: CheckpointedDrm[_] => action
+      case cd: CheckpointedDrm[_] ⇒ action
 
       // For everything else we just pass-thru the operator arguments to 
optimizer
-      case uop: AbstractUnaryOp[_, K] =>
+      case uop: AbstractUnaryOp[_, K] ⇒
         uop.A = pass1(uop.A)(uop.classTagA)
         uop
-      case bop: AbstractBinaryOp[_, _, K] =>
+
+      case bop: AbstractBinaryOp[_, _, K] ⇒
         bop.A = pass1(bop.A)(bop.classTagA)
         bop.B = pass1(bop.B)(bop.classTagB)
         bop
@@ -160,17 +192,30 @@ object DistributedEngine {
   /** This would remove stuff like A.t.t that previous step may have created */
   private def pass2[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
     action match {
+
+      // Fusion of unary funcs into single, like 1 + x * x.
+      // Since we repeating the pass over self after rewrite, we dont' need to 
descend into arguments
+      // recursively here.
+      case op1@OpAewUnaryFunc(op2@OpAewUnaryFunc(a, _, _), _, _) ⇒
+        pass2(OpAewUnaryFuncFusion(a, op1 :: op2 :: Nil))
+
+      // Fusion one step further, like 1 + 2 * x * x. All should be rewritten 
as one UnaryFuncFusion.
+      // Since we repeating the pass over self after rewrite, we dont' need to 
descend into arguments
+      // recursively here.
+      case op@OpAewUnaryFuncFusion(op2@OpAewUnaryFunc(a, _, _), _) ⇒
+        pass2(OpAewUnaryFuncFusion(a, op.ff :+ op2))
+
       // A.t.t => A
-      case OpAt(top@OpAt(a)) => pass2(a)(top.classTagA)
+      case OpAt(top@OpAt(a)) ⇒  pass2(a)(top.classTagA)
 
       // Stop at checkpoints
-      case cd: CheckpointedDrm[_] => action
+      case cd: CheckpointedDrm[_] ⇒  action
 
       // For everything else we just pass-thru the operator arguments to 
optimizer
-      case uop: AbstractUnaryOp[_, K] =>
+      case uop: AbstractUnaryOp[_, K] ⇒
         uop.A = pass2(uop.A)(uop.classTagA)
         uop
-      case bop: AbstractBinaryOp[_, _, K] =>
+      case bop: AbstractBinaryOp[_, _, K] ⇒
         bop.A = pass2(bop.A)(bop.classTagA)
         bop.B = pass2(bop.B)(bop.classTagB)
         bop
@@ -182,29 +227,29 @@ object DistributedEngine {
     action match {
 
       // matrix products.
-      case OpAB(a, OpAt(b)) => OpABt(pass3(a), pass3(b))
+      case OpAB(a, OpAt(b)) ⇒  OpABt(pass3(a), pass3(b))
 
       // AtB cases that make sense.
-      case OpAB(OpAt(a), b) if (a.partitioningTag == b.partitioningTag) => 
OpAtB(pass3(a), pass3(b))
-      case OpABAnyKey(OpAtAnyKey(a), b) => OpAtB(pass3(a), pass3(b))
+      case OpAB(OpAt(a), b) if (a.partitioningTag == b.partitioningTag) ⇒  
OpAtB(pass3(a), pass3(b))
+      case OpABAnyKey(OpAtAnyKey(a), b) ⇒  OpAtB(pass3(a), pass3(b))
 
       // Need some cost to choose between the following.
 
-      case OpAB(OpAt(a), b) => OpAtB(pass3(a), pass3(b))
+      case OpAB(OpAt(a), b) ⇒  OpAtB(pass3(a), pass3(b))
       //      case OpAB(OpAt(a), b) => OpAt(OpABt(OpAt(pass1(b)), pass1(a)))
-      case OpAB(a, b) => OpABt(pass3(a), OpAt(pass3(b)))
+      case OpAB(a, b) ⇒  OpABt(pass3(a), OpAt(pass3(b)))
 
       // Rewrite A'x
-      case op@OpAx(op1@OpAt(a), x) => OpAtx(pass3(a)(op1.classTagA), x)
+      case op@OpAx(op1@OpAt(a), x) ⇒  OpAtx(pass3(a)(op1.classTagA), x)
 
       // Stop at checkpoints
-      case cd: CheckpointedDrm[_] => action
+      case cd: CheckpointedDrm[_] ⇒  action
 
       // For everything else we just pass-thru the operator arguments to 
optimizer
-      case uop: AbstractUnaryOp[_, K] =>
+      case uop: AbstractUnaryOp[_, K] ⇒
         uop.A = pass3(uop.A)(uop.classTagA)
         uop
-      case bop: AbstractBinaryOp[_, _, K] =>
+      case bop: AbstractBinaryOp[_, _, K] ⇒
         bop.A = pass3(bop.A)(bop.classTagA)
         bop.B = pass3(bop.B)(bop.classTagB)
         bop

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala
index e5cf563..96ef893 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala
@@ -18,7 +18,11 @@
 package org.apache.mahout.math.drm
 
 import RLikeDrmOps._
-import scala.reflect.ClassTag
+import org.apache.mahout.math._
+import org.apache.mahout.math.drm.logical.OpCbindScalar
+import scalabindings._
+import RLikeOps._
+import reflect.ClassTag
 
 class DrmDoubleScalarOps(val x:Double) extends AnyVal{
 
@@ -30,4 +34,6 @@ class DrmDoubleScalarOps(val x:Double) extends AnyVal{
 
   def /[K:ClassTag](that:DrmLike[K]) = x /: that
 
+  def cbind[K: ClassTag](that: DrmLike[K]) = OpCbindScalar(A = that, x = x, 
leftBind = true)
+
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
index bc937d6..19432d0 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
@@ -49,7 +49,7 @@ class DrmLikeOps[K: ClassTag](protected[drm] val drm: 
DrmLike[K]) {
    *             is applied.
    */
   def par(min: Int = -1, exact: Int = -1, auto: Boolean = false) = {
-    assert(min >= 0 || exact >= 0 || auto, "Invalid argument")
+    require(min > 0 || exact > 0 || auto, "Invalid argument")
     OpPar(drm, minSplits = min, exactSplits = exact)
   }
 
@@ -65,16 +65,15 @@ class DrmLikeOps[K: ClassTag](protected[drm] val drm: 
DrmLike[K]) {
    * @tparam R
    * @return
    */
-  def mapBlock[R: ClassTag](ncol: Int = -1, identicallyParitioned: Boolean = 
true)
+  def mapBlock[R: ClassTag](ncol: Int = -1, identicallyPartitioned: Boolean = 
true)
       (bmf: BlockMapFunc[K, R]): DrmLike[R] =
     new OpMapBlock[K, R](
       A = drm,
       bmf = bmf,
       _ncol = ncol,
-      identicallyPartitioned = identicallyParitioned
+      identicallyPartitioned = identicallyPartitioned
     )
 
-
   /**
    * Slicing the DRM. Should eventually work just like in-core drm (e.g. A(0 
until 5, 5 until 15)).<P>
    *

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
index 380f4eb..7927e51 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -18,12 +18,17 @@
 package org.apache.mahout.math.drm
 
 import scala.reflect.ClassTag
+import collection._
+import JavaConversions._
 import org.apache.mahout.math.{Vector, Matrix}
 import org.apache.mahout.math.drm.logical._
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
 
 class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
 
   import RLikeDrmOps._
+  import org.apache.mahout.math.scalabindings._
 
   def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "+")
 
@@ -33,21 +38,23 @@ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends 
DrmLikeOps[K](drm) {
 
   def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "/")
 
-  def +(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op 
= "+")
+  def +(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = _ + that, 
evalZeros = true)
 
-  def +:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, 
op = "+")
+  def +:(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = that + _, 
evalZeros = true)
 
-  def -(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op 
= "-")
+  def -(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = _ - that, 
evalZeros = true)
 
-  def -:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, 
op = "-:")
+  def -:(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = that - _, 
evalZeros = true)
 
-  def *(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op 
= "*")
+  def *(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = _ * that)
 
-  def *:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, 
op = "*")
+  def *:(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = that * _)
 
-  def /(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op 
= "/")
+  def ^(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = 
math.pow(_, that))
 
-  def /:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, 
op = "/:")
+  def /(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = _ / that, 
evalZeros = that == 0.0)
+
+  def /:(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = that / _, 
evalZeros = true)
 
   def :%*%(that: DrmLike[Int]): DrmLike[K] = OpAB[K](A = this.drm, B = that)
 
@@ -65,18 +72,36 @@ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends 
DrmLikeOps[K](drm) {
 
   def t: DrmLike[Int] = OpAtAnyKey(A = drm)
 
-  def cbind(that: DrmLike[K]) = OpCbind(A = this.drm, B = that)
+  def cbind(that: DrmLike[K]): DrmLike[K] = OpCbind(A = this.drm, B = that)
+
+  def cbind(that: Double): DrmLike[K] = OpCbindScalar(A = this.drm, x = that, 
leftBind = false)
+
+  def rbind(that: DrmLike[K]): DrmLike[K] = OpRbind(A = this.drm, B = that)
 
-  def rbind(that: DrmLike[K]) = OpRbind(A = this.drm, B = that)
+  /**
+   * `rowSums` method for non-int keyed matrices.
+   *
+   * Slight problem here is the limitation of in-memory representation of 
Colt's Matrix, which can
+   * only have String row labels. Therefore, internally we do ".toString()" 
call on each key object,
+   * and then put it into [[Matrix]] row label bindings, at which point they 
are coerced to be Strings.
+   *
+   * This is obviously a suboptimal behavior, so as TODO we have here future 
enhancements of `collect'.
+   *
+   * @return map of row keys into row sums, front-end collected.
+   */
+  def rowSumsMap(): Map[String, Double] = {
+    val m = drm.mapBlock(ncol = 1) { case (keys, block) =>
+      keys -> dense(block.rowSums).t
+    }.collect
+    m.getRowLabelBindings.map { case (key, idx) => key -> m(idx, 0)}
+  }
 }
 
 class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {
 
   import org.apache.mahout.math._
   import scalabindings._
-  import RLikeOps._
   import RLikeDrmOps._
-  import scala.collection.JavaConversions._
 
   override def t: DrmLike[Int] = OpAt(A = drm)
 
@@ -108,7 +133,7 @@ class RLikeDrmIntOps(drm: DrmLike[Int]) extends 
RLikeDrmOps[Int](drm) {
       // Collect block-wise row means and output them as one-column matrix.
       keys -> dense(block.rowMeans).t
     }
-        .collect(::, 0)
+      .collect(::, 0)
   }
 
   /** Return diagonal vector */
@@ -117,14 +142,14 @@ class RLikeDrmIntOps(drm: DrmLike[Int]) extends 
RLikeDrmOps[Int](drm) {
     drm.mapBlock(ncol = 1) { case (keys, block) =>
       keys -> dense(for (r <- block.view) yield r(keys(r.index))).t
     }
-        .collect(::, 0)
+      .collect(::, 0)
   }
 
 }
 
 object RLikeDrmOps {
 
-  implicit def double2ScalarOps(x:Double) = new DrmDoubleScalarOps(x)
+  implicit def double2ScalarOps(x: Double) = new DrmDoubleScalarOps(x)
 
   implicit def drmInt2RLikeOps(drm: DrmLike[Int]): RLikeDrmIntOps = new 
RLikeDrmIntOps(drm)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
index a445f21..60b2c77 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
@@ -24,7 +24,7 @@ import org.apache.mahout.math.drm.{DistributedContext, 
DrmLike}
 abstract class AbstractUnaryOp[A: ClassTag, K: ClassTag]
     extends CheckpointAction[K] with DrmLike[K] {
 
-  protected[drm] var A: DrmLike[A]
+  protected[mahout] var A: DrmLike[A]
 
   lazy val context: DistributedContext = A.context
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
index aa3a3b9..a7934a3 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
@@ -37,7 +37,8 @@ abstract class CheckpointAction[K: ClassTag] extends 
DrmLike[K] {
    */
   def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = cp 
match {
     case None =>
-      val physPlan = context.toPhysical(context.optimizerRewrite(this), 
cacheHint)
+      val plan = context.optimizerRewrite(this)
+      val physPlan = context.toPhysical(plan, cacheHint)
       cp = Some(physPlan)
       physPlan
     case Some(cp) => cp

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
index 19a910c..dbcb366 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala
@@ -21,7 +21,11 @@ import scala.reflect.ClassTag
 import org.apache.mahout.math.drm.DrmLike
 import scala.util.Random
 
-/** Operator denoting expressions like 5.0 - A or A * 5.6 */
+/**
+ * Operator denoting expressions like 5.0 - A or A * 5.6
+ *
+ * @deprecated use [[OpAewUnaryFunc]] instead
+ */
 case class OpAewScalar[K: ClassTag](
     override var A: DrmLike[K],
     val scalar: Double,

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala
 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala
new file mode 100644
index 0000000..71489ab
--- /dev/null
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+import scala.util.Random
+
+/**
+ * @author dmitriy
+ */
+case class OpAewUnaryFunc[K: ClassTag](
+    override var A: DrmLike[K],
+    val f: (Double) => Double,
+    val evalZeros:Boolean = false
+    ) extends AbstractUnaryOp[K,K] with TEwFunc {
+
+  override protected[mahout] lazy val partitioningTag: Long =
+    if (A.canHaveMissingRows)
+      Random.nextLong()
+    else A.partitioningTag
+
+  /** Stuff like `A +1` is always supposed to fix this */
+  override protected[mahout] lazy val canHaveMissingRows: Boolean = false
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = A.ncol
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala
 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala
new file mode 100644
index 0000000..ed95f4f
--- /dev/null
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+import scala.util.Random
+import collection._
+
+/**
+ * Composition of unary elementwise functions.
+ */
+case class OpAewUnaryFuncFusion[K: ClassTag](
+    override var A: DrmLike[K],
+    var ff:List[OpAewUnaryFunc[K]] = Nil
+    ) extends AbstractUnaryOp[K,K] with TEwFunc {
+
+  override protected[mahout] lazy val partitioningTag: Long =
+    if (A.canHaveMissingRows)
+      Random.nextLong()
+    else A.partitioningTag
+
+  /** Stuff like `A +1` is always supposed to fix this */
+  override protected[mahout] lazy val canHaveMissingRows: Boolean = false
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = A.ncol
+
+  /** Apply to degenerate elements? */
+  override def evalZeros: Boolean = ff.exists(_.evalZeros)
+
+  /** the function itself */
+  override def f: (Double) => Double = {
+
+    // Make sure composed collection becomes an attribute of this closure 
because we will be sending
+    // it to the backend.
+    val composedFunc = ff.map(_.f)
+
+    // Create functional closure and return.
+    (x: Double) => (composedFunc :\ x) { case (f, xarg) => f(xarg)}
+
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala
index 1425264..0598551 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala
@@ -17,7 +17,7 @@
 
 package org.apache.mahout.math.drm.logical
 
-import scala.reflect.ClassTag
+import reflect.ClassTag
 import org.apache.mahout.math.drm.DrmLike
 import scala.util.Random
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala
 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala
new file mode 100644
index 0000000..5aee518
--- /dev/null
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.math.drm.logical
+
+import reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+
+case class OpCbindScalar[K:ClassTag](
+  override var A:DrmLike[K],
+  var x:Double,
+  val leftBind:Boolean ) extends AbstractUnaryOp[K,K] {
+
+  override protected[mahout] lazy val canHaveMissingRows: Boolean = false
+
+  override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = A.ncol + 1
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
index 7299d9e..a1cd718 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala
@@ -23,7 +23,7 @@ import RLikeOps._
 import org.apache.mahout.math.drm.{BlockMapFunc, DrmLike}
 import scala.util.Random
 
-class OpMapBlock[S: ClassTag, R: ClassTag](
+case class OpMapBlock[S: ClassTag, R: ClassTag](
     override var A: DrmLike[S],
     val bmf: BlockMapFunc[S, R],
     val _ncol: Int = -1,

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/TEwFunc.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/TEwFunc.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/TEwFunc.scala
new file mode 100644
index 0000000..0eb5f65
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/TEwFunc.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+/**
+ * Trait denoting logical operators providing elementwise operations that work 
as unary operators
+ * on each element of a matrix.
+ */
+trait TEwFunc {
+
+  /** Apply to degenerate elments? */
+  def evalZeros: Boolean
+
+  /** the function itself */
+  def f: (Double) => Double
+
+  /**
+   * Self assignment ok? If yes, may cause side effects if works off 
non-serialized cached object
+   * tree!
+   */
+  def selfAssignOk: Boolean = false
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index 1fae831..d865b58 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -23,6 +23,8 @@ import org.apache.mahout.math.scalabindings.RLikeOps._
 import org.apache.mahout.math.scalabindings._
 
 import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.logical.OpAewUnaryFunc
+import collection._
 
 package object drm {
 
@@ -34,7 +36,11 @@ package object drm {
 
 
   /** Block-map func */
-  type BlockMapFunc[S, R] = BlockifiedDrmTuple[S] => BlockifiedDrmTuple[R]
+  type BlockMapFunc[S, R] = BlockifiedDrmTuple[S] ⇒ BlockifiedDrmTuple[R]
+
+  type BlockMapFunc2[S] = BlockifiedDrmTuple[S] ⇒ Matrix
+
+  type BlockReduceFunc = (Matrix, Matrix) ⇒ Matrix
 
   /** CacheHint type */
   //  type CacheHint = CacheHint.CacheHint
@@ -92,7 +98,7 @@ package object drm {
   implicit def drm2InCore[K: ClassTag](drm: DrmLike[K]): Matrix = drm.collect
 
   /** Do vertical concatenation of collection of blockified tuples */
-  def rbind[K: ClassTag](blocks: Iterable[BlockifiedDrmTuple[K]]): 
BlockifiedDrmTuple[K] = {
+  private[mahout] def rbind[K: ClassTag](blocks: 
Iterable[BlockifiedDrmTuple[K]]): BlockifiedDrmTuple[K] = {
     assert(blocks.nonEmpty, "rbind: 0 blocks passed in")
     if (blocks.size == 1) {
       // No coalescing required.
@@ -115,6 +121,46 @@ package object drm {
     }
   }
 
+  /**
+   * Convert arbitrarily-keyed matrix to int-keyed matrix. Some algebra will 
accept only int-numbered
+   * row matrices. So this method is to help.
+   *
+   * @param drmX input to be transcoded
+   * @param computeMap collect `old key -> int key` map to front-end?
+   * @tparam K key type
+   * @return Sequentially keyed matrix + (optionally) map from non-int key to 
[[Int]] key. If the
+   *         key type is actually Int, then we just return the argument with 
None for the map,
+   *         regardless of computeMap parameter.
+   */
+  def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = 
false): (DrmLike[Int], Option[DrmLike[K]]) =
+    drmX.context.engine.drm2IntKeyed(drmX, computeMap)
+
+  /**
+   * (Optional) Sampling operation. Consistent with Spark semantics of the 
same.
+   * @param drmX
+   * @param fraction
+   * @param replacement
+   * @tparam K
+   * @return samples
+   */
+  def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, 
replacement: Boolean = false): DrmLike[K] =
+    drmX.context.engine.drmSampleRows(drmX, fraction, replacement)
+
+  def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples: Int, 
replacement: Boolean = false): Matrix =
+    drmX.context.engine.drmSampleKRows(drmX, numSamples, replacement)
+
+  ///////////////////////////////////////////////////////////
+  // Elementwise unary functions on distributed operands.
+  def dexp[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new 
OpAewUnaryFunc[K](drmA, math.exp, true)
+
+  def dlog[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new 
OpAewUnaryFunc[K](drmA, math.log, true)
+
+  def dabs[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new 
OpAewUnaryFunc[K](drmA, math.abs)
+
+  def dsqrt[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new 
OpAewUnaryFunc[K](drmA, math.sqrt)
+
+  def dsignum[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new 
OpAewUnaryFunc[K](drmA, math.signum)
+
 }
 
 package object indexeddataset {

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/DoubleScalarOps.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/DoubleScalarOps.scala
 
b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/DoubleScalarOps.scala
deleted file mode 100644
index 9fdd6e5..0000000
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/DoubleScalarOps.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.scalabindings
-
-import org.apache.mahout.math._
-
-class DoubleScalarOps(val x:Double) extends AnyVal{
-
-  import RLikeOps._
-
-  def +(that:Matrix) = that + x
-
-  def +(that:Vector) = that + x
-
-  def *(that:Matrix) = that * x
-
-  def *(that:Vector) = that * x
-
-  def -(that:Matrix) = x -: that
-
-  def -(that:Vector) = x -: that
-
-  def /(that:Matrix) = x /: that
-
-  def /(that:Vector) = x /: that
-
-}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala
new file mode 100644
index 0000000..d0fd393
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.scalabindings
+
+import org.apache.mahout.math._
+import org.apache.mahout.math.flavor.{BackEnum, TraversingStructureEnum}
+import org.apache.mahout.math.function.Functions
+import RLikeOps._
+import org.apache.mahout.logging._
+
+import scala.collection.JavaConversions._
+import scala.collection._
+
+object MMul extends MMBinaryFunc {
+
+  private final implicit val log = getLog(MMul.getClass)
+
+  override def apply(a: Matrix, b: Matrix, r: Option[Matrix]): Matrix = {
+
+    require(a.ncol == b.nrow, "Incompatible matrix sizes in matrix 
multiplication.")
+
+    val (af, bf) = (a.getFlavor, b.getFlavor)
+    val backs = (af.getBacking, bf.getBacking)
+    val sd = (af.getStructure, af.isDense, bf.getStructure, bf.isDense)
+
+    val alg: MMulAlg = backs match {
+
+      // Both operands are jvm memory backs.
+      case (BackEnum.JVMMEM, BackEnum.JVMMEM) ⇒
+
+        sd match {
+
+          // Multiplication cases by a diagonal matrix.
+          case (TraversingStructureEnum.VECTORBACKED, _, 
TraversingStructureEnum.COLWISE, _) if (a
+            .isInstanceOf[DiagonalMatrix]) ⇒ jvmDiagCW _
+          case (TraversingStructureEnum.VECTORBACKED, _, 
TraversingStructureEnum.SPARSECOLWISE, _) if (a
+            .isInstanceOf[DiagonalMatrix]) ⇒ jvmDiagCW _
+          case (TraversingStructureEnum.VECTORBACKED, _, 
TraversingStructureEnum.ROWWISE, _) if (a
+            .isInstanceOf[DiagonalMatrix]) ⇒ jvmDiagRW _
+          case (TraversingStructureEnum.VECTORBACKED, _, 
TraversingStructureEnum.SPARSEROWWISE, _) if (a
+            .isInstanceOf[DiagonalMatrix]) ⇒ jvmDiagRW _
+
+          case (TraversingStructureEnum.COLWISE, _, 
TraversingStructureEnum.VECTORBACKED, _) if (b
+            .isInstanceOf[DiagonalMatrix]) ⇒ jvmCWDiag _
+          case (TraversingStructureEnum.SPARSECOLWISE, _, 
TraversingStructureEnum.VECTORBACKED, _) if (b
+            .isInstanceOf[DiagonalMatrix]) ⇒ jvmCWDiag _
+          case (TraversingStructureEnum.ROWWISE, _, 
TraversingStructureEnum.VECTORBACKED, _) if (b
+            .isInstanceOf[DiagonalMatrix]) ⇒ jvmRWDiag _
+          case (TraversingStructureEnum.SPARSEROWWISE, _, 
TraversingStructureEnum.VECTORBACKED, _) if (b
+            .isInstanceOf[DiagonalMatrix]) ⇒ jvmRWDiag _
+
+          // Dense-dense cases
+          case (TraversingStructureEnum.ROWWISE, true, 
TraversingStructureEnum.COLWISE, true) if (a eq b.t) ⇒ jvmDRWAAt _
+          case (TraversingStructureEnum.ROWWISE, true, 
TraversingStructureEnum.COLWISE, true) if (a.t eq b) ⇒ jvmDRWAAt _
+          case (TraversingStructureEnum.ROWWISE, true, 
TraversingStructureEnum.COLWISE, true) ⇒ jvmRWCW
+          case (TraversingStructureEnum.ROWWISE, true, 
TraversingStructureEnum.ROWWISE, true) ⇒ jvmRWRW
+          case (TraversingStructureEnum.COLWISE, true, 
TraversingStructureEnum.COLWISE, true) ⇒ jvmCWCW
+          case (TraversingStructureEnum.COLWISE, true, 
TraversingStructureEnum.ROWWISE, true) if ( a eq b.t) ⇒ jvmDCWAAt _
+          case (TraversingStructureEnum.COLWISE, true, 
TraversingStructureEnum.ROWWISE, true) if ( a.t eq b) ⇒ jvmDCWAAt _
+          case (TraversingStructureEnum.COLWISE, true, 
TraversingStructureEnum.ROWWISE, true) ⇒ jvmCWRW
+
+          // Sparse row matrix x sparse row matrix (array of vectors)
+          case (TraversingStructureEnum.ROWWISE, false, 
TraversingStructureEnum.ROWWISE, false) ⇒ jvmSparseRWRW
+          case (TraversingStructureEnum.ROWWISE, false, 
TraversingStructureEnum.COLWISE, false) ⇒ jvmSparseRWCW
+          case (TraversingStructureEnum.COLWISE, false, 
TraversingStructureEnum.ROWWISE, false) ⇒ jvmSparseCWRW
+          case (TraversingStructureEnum.COLWISE, false, 
TraversingStructureEnum.COLWISE, false) ⇒ jvmSparseCWCW
+
+          // Sparse matrix x sparse matrix (hashtable of vectors)
+          case (TraversingStructureEnum.SPARSEROWWISE, false, 
TraversingStructureEnum.SPARSEROWWISE, false) ⇒
+            jvmSparseRowRWRW
+          case (TraversingStructureEnum.SPARSEROWWISE, false, 
TraversingStructureEnum.SPARSECOLWISE, false) ⇒
+            jvmSparseRowRWCW
+          case (TraversingStructureEnum.SPARSECOLWISE, false, 
TraversingStructureEnum.SPARSEROWWISE, false) ⇒
+            jvmSparseRowCWRW
+          case (TraversingStructureEnum.SPARSECOLWISE, false, 
TraversingStructureEnum.SPARSECOLWISE, false) ⇒
+            jvmSparseRowCWCW
+
+          // Sparse matrix x non-like
+          case (TraversingStructureEnum.SPARSEROWWISE, false, 
TraversingStructureEnum.ROWWISE, _) ⇒ jvmSparseRowRWRW
+          case (TraversingStructureEnum.SPARSEROWWISE, false, 
TraversingStructureEnum.COLWISE, _) ⇒ jvmSparseRowRWCW
+          case (TraversingStructureEnum.SPARSECOLWISE, false, 
TraversingStructureEnum.ROWWISE, _) ⇒ jvmSparseRowCWRW
+          case (TraversingStructureEnum.SPARSECOLWISE, false, 
TraversingStructureEnum.COLWISE, _) ⇒ jvmSparseCWCW
+          case (TraversingStructureEnum.ROWWISE, _, 
TraversingStructureEnum.SPARSEROWWISE, false) ⇒ jvmSparseRWRW
+          case (TraversingStructureEnum.ROWWISE, _, 
TraversingStructureEnum.SPARSECOLWISE, false) ⇒ jvmSparseRWCW
+          case (TraversingStructureEnum.COLWISE, _, 
TraversingStructureEnum.SPARSEROWWISE, false) ⇒ jvmSparseCWRW
+          case (TraversingStructureEnum.COLWISE, _, 
TraversingStructureEnum.SPARSECOLWISE, false) ⇒ jvmSparseRowCWCW
+
+          // Everything else including at least one sparse LHS or RHS argument
+          case (TraversingStructureEnum.ROWWISE, false, 
TraversingStructureEnum.ROWWISE, _) ⇒ jvmSparseRWRW
+          case (TraversingStructureEnum.ROWWISE, false, 
TraversingStructureEnum.COLWISE, _) ⇒ jvmSparseRWCW
+          case (TraversingStructureEnum.COLWISE, false, 
TraversingStructureEnum.ROWWISE, _) ⇒ jvmSparseCWRW
+          case (TraversingStructureEnum.COLWISE, false, 
TraversingStructureEnum.COLWISE, _) ⇒ jvmSparseCWCW2flips
+
+          // Sparse methods are only effective if the first argument is 
sparse, so we need to do a swap.
+          case (_, _, _, false) ⇒ { (a, b, r) ⇒ apply(b.t, a.t, r.map 
{_.t}).t }
+
+          // Default jvm-jvm case.
+          case _ ⇒ jvmRWCW
+        }
+    }
+
+    alg(a, b, r)
+  }
+
+  type MMulAlg = MMBinaryFunc
+
+  @inline
+  private def jvmRWCW(a: Matrix, b: Matrix, r: Option[Matrix] = None): Matrix 
= {
+
+    require(r.forall(mxR ⇒ mxR.nrow == a.nrow && mxR.ncol == b.ncol))
+    val (m, n) = (a.nrow, b.ncol)
+
+    val mxR = r.getOrElse(if (a.getFlavor.isDense) a.like(m, n) else b.like(m, 
n))
+
+    for (row ← 0 until mxR.nrow; col ← 0 until mxR.ncol) {
+      // this vector-vector should be sort of optimized, right?
+      mxR(row, col) = a(row, ::) dot b(::, col)
+    }
+    mxR
+  }
+
+
+  @inline
+  private def jvmRWRW(a: Matrix, b: Matrix, r: Option[Matrix] = None): Matrix 
= {
+
+    // A bit hackish: currently, this relies a bit on the fact that like 
produces RW(?)
+    val bclone = b.like(b.ncol, b.nrow).t
+    for (brow ← b) bclone(brow.index(), ::) := brow
+
+    require(bclone.getFlavor.getStructure == TraversingStructureEnum.COLWISE 
|| bclone.getFlavor.getStructure ==
+      TraversingStructureEnum.SPARSECOLWISE, "COL wise conversion assumption 
of RHS is wrong, do over this code.")
+
+    jvmRWCW(a, bclone, r)
+  }
+
+  private def jvmCWCW(a: Matrix, b: Matrix, r: Option[Matrix] = None): Matrix 
= {
+    jvmRWRW(b.t, a.t, r.map(_.t)).t
+  }
+
+  private def jvmCWRW(a: Matrix, b: Matrix, r: Option[Matrix] = None): Matrix 
= {
+    // This is a primary contender with Outer Prod sum algo.
+    // Here, we force-reorient both matrices and run RWCW.
+    // A bit hackish: currently, this relies a bit on the fact that clone 
always produces RW(?)
+    val aclone = a.cloned
+
+    require(aclone.getFlavor.getStructure == TraversingStructureEnum.ROWWISE 
|| aclone.getFlavor.getStructure ==
+      TraversingStructureEnum.SPARSEROWWISE, "Row wise conversion assumption 
of RHS is wrong, do over this code.")
+
+    jvmRWRW(aclone, b, r)
+  }
+
+  private def jvmSparseRWRW(a: Matrix, b: Matrix, r: Option[Matrix] = None): 
Matrix = {
+    val mxR = r.getOrElse(b.like(a.nrow, b.ncol))
+
+    // This is basically almost the algorithm from SparseMatrix.times
+    for (arow ← a; ael ← arow.nonZeroes)
+      mxR(arow.index(), ::).assign(b(ael.index, ::), Functions.plusMult(ael))
+
+    mxR
+  }
+
+  private def jvmSparseRowRWRW(a: Matrix, b: Matrix, r: Option[Matrix] = 
None): Matrix = {
+    val mxR = r.getOrElse(b.like(a.nrow, b.ncol))
+    for (arow ← a.iterateNonEmpty(); ael ← arow.vector.nonZeroes)
+      mxR(arow.index(), ::).assign(b(ael.index, ::), Functions.plusMult(ael))
+
+    mxR
+  }
+
+  private def jvmSparseRowCWCW(a: Matrix, b: Matrix, r: Option[Matrix] = None) 
=
+    jvmSparseRowRWRW(b.t, a.t, r.map(_.t)).t
+
+  private def jvmSparseRowCWCW2flips(a: Matrix, b: Matrix, r: Option[Matrix] = 
None) =
+    jvmSparseRowRWRW(a cloned, b cloned, r)
+
+  private def jvmSparseRowRWCW(a: Matrix, b: Matrix, r: Option[Matrix]) =
+    jvmSparseRowRWRW(a, b cloned, r)
+
+
+  private def jvmSparseRowCWRW(a: Matrix, b: Matrix, r: Option[Matrix]) =
+    jvmSparseRowRWRW(a cloned, b, r)
+
+  private def jvmSparseRWCW(a: Matrix, b: Matrix, r: Option[Matrix] = None) =
+    jvmSparseRWRW(a, b.cloned, r)
+
+  private def jvmSparseCWRW(a: Matrix, b: Matrix, r: Option[Matrix] = None) =
+    jvmSparseRWRW(a cloned, b, r)
+
+  private def jvmSparseCWCW(a: Matrix, b: Matrix, r: Option[Matrix] = None) =
+    jvmSparseRWRW(b.t, a.t, r.map(_.t)).t
+
+  private def jvmSparseCWCW2flips(a: Matrix, b: Matrix, r: Option[Matrix] = 
None) =
+    jvmSparseRWRW(a cloned, b cloned, r)
+
+  private def jvmDiagRW(diagm:Matrix, b:Matrix, r:Option[Matrix] = 
None):Matrix = {
+    val mxR = r.getOrElse(b.like(diagm.nrow, b.ncol))
+
+    for (del ← diagm.diagv.nonZeroes())
+      mxR(del.index, ::).assign(b(del.index, ::), Functions.plusMult(del))
+
+    mxR
+  }
+
+  private def jvmDiagCW(diagm: Matrix, b: Matrix, r: Option[Matrix] = None): 
Matrix = {
+    val mxR = r.getOrElse(b.like(diagm.nrow, b.ncol))
+    for (bcol ← b.t) mxR(::, bcol.index()) := bcol * diagm.diagv
+    mxR
+  }
+
+  private def jvmCWDiag(a: Matrix, diagm: Matrix, r: Option[Matrix] = None) =
+    jvmDiagRW(diagm, a.t, r.map {_.t}).t
+
+  private def jvmRWDiag(a: Matrix, diagm: Matrix, r: Option[Matrix] = None) =
+    jvmDiagCW(diagm, a.t, r.map {_.t}).t
+
+
+  /** Dense column-wise AA' */
+  private def jvmDCWAAt(a:Matrix, b:Matrix, r:Option[Matrix] = None) = {
+    // a.t must be equiv. to b. Cloning must rewrite to row-wise.
+    jvmDRWAAt(a.cloned,null,r)
+  }
+
+  /** Dense Row-wise AA' */
+  private def jvmDRWAAt(a:Matrix, b:Matrix, r:Option[Matrix] = None) = {
+    // a.t must be equiv to b.
+
+    debug("AAt computation detected.")
+
+    // Check dimensions if result is supplied.
+    require(r.forall(mxR ⇒ mxR.nrow == a.nrow && mxR.ncol == a.nrow))
+
+    val mxR = r.getOrElse(a.like(a.nrow, a.nrow))
+
+    // This is symmetric computation. Compile upper triangular first.
+    for (row ← 0 until mxR.nrow) {
+      // diagonal value
+      mxR(row, row) = a(row, ::).aggregate(Functions.PLUS, Functions.SQUARE)
+
+      for ( col ← row + 1 until mxR.ncol) {
+        // this vector-vector should be sort of optimized, right?
+        val v = a(row, ::) dot a(col, ::)
+
+        mxR(row, col) = v
+        mxR(col,row) = v
+      }
+    }
+
+    mxR
+  }
+
+  private def jvmOuterProdSum(a: Matrix, b: Matrix, r: Option[Matrix] = None): 
Matrix = {
+
+    // This may be already laid out for outer product computation, which may 
be faster than reorienting
+    // both matrices? need to check.
+    val (m, n) = (a.nrow, b.ncol)
+
+    // Prefer col-wise result iff a is dense and b is sparse. In all other 
cases default to row-wise.
+    val preferColWiseR = a.getFlavor.isDense && !b.getFlavor.isDense
+
+    val mxR = r.getOrElse {
+      (a.getFlavor.isDense, preferColWiseR) match {
+        case (false, false) ⇒ b.like(m, n)
+        case (false, true) ⇒ b.like(n, m).t
+        case (true, false) ⇒ a.like(m, n)
+        case (true, true) ⇒ a.like(n, m).t
+      }
+    }
+
+    // Loop outer products
+    if (preferColWiseR) {
+      // this means B is sparse and A is not, so we need to iterate over b 
values and update R columns with +=
+      // one at a time.
+      for ((acol, brow) ← a.t.zip(b); bel ← brow.nonZeroes) mxR(::, 
bel.index()) += bel * acol
+    } else {
+      for ((acol, brow) ← a.t.zip(b); ael ← acol.nonZeroes()) 
mxR(ael.index(), ::) += ael * brow
+    }
+
+    mxR
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
 
b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
index 910035f..3c0ae89 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala
@@ -17,8 +17,10 @@
 
 package org.apache.mahout.math.scalabindings
 
+import org.apache.mahout.math.flavor.TraversingStructureEnum
 import org.apache.mahout.math.{Matrices, QRDecomposition, Vector, Matrix}
-import scala.collection.JavaConversions._
+import collection._
+import JavaConversions._
 import org.apache.mahout.math.function.{DoubleDoubleFunction, VectorFunction, 
DoubleFunction, Functions}
 import scala.math._
 
@@ -41,6 +43,10 @@ class MatrixOps(val m: Matrix) {
 
   def +=(that: Matrix) = m.assign(that, Functions.PLUS)
 
+  def +=:(that:Matrix) = m += that
+
+  def +=:(that:Double) = m += that
+
   def -=(that: Matrix) = m.assign(that, Functions.MINUS)
 
   def +=(that: Double) = m.assign(new DoubleFunction {
@@ -70,24 +76,30 @@ class MatrixOps(val m: Matrix) {
 
   def -:(that: Double) = that -=: cloned
 
-
-  def norm = sqrt(m.aggregate(Functions.PLUS, Functions.SQUARE))
+  def norm = math.sqrt(m.aggregate(Functions.PLUS, Functions.SQUARE))
 
   def pnorm(p: Int) = pow(m.aggregate(Functions.PLUS, 
Functions.chain(Functions.ABS, Functions.pow(p))), 1.0 / p)
 
   def apply(row: Int, col: Int) = m.get(row, col)
 
-  def update(row: Int, col: Int, v: Double): Matrix = {
-    m.setQuick(row, col, v);
+  def update(row: Int, col: Int, that: Double): Matrix = {
+    m.setQuick(row, col, that);
     m
   }
 
+  def update(rowRange: Range, colRange: Range, that: Double) = apply(rowRange, 
colRange) := that
+
+  def update(row: Int, colRange: Range, that: Double) = apply(row, colRange) 
:= that
+
+  def update(rowRange: Range, col: Int, that: Double) = apply(rowRange, col) 
:= that
+
   def update(rowRange: Range, colRange: Range, that: Matrix) = apply(rowRange, 
colRange) := that
 
   def update(row: Int, colRange: Range, that: Vector) = apply(row, colRange) 
:= that
 
   def update(rowRange: Range, col: Int, that: Vector) = apply(rowRange, col) 
:= that
-
+  
+  
   def apply(rowRange: Range, colRange: Range): Matrix = {
 
     if (rowRange == :: &&
@@ -140,12 +152,60 @@ class MatrixOps(val m: Matrix) {
     })
   }
 
+  def :=(that: Double) = m.assign(that)
+
   def :=(f: (Int, Int, Double) => Double): Matrix = {
-    for (r <- 0 until nrow; c <- 0 until ncol) m(r, c) = f(r, c, m(r, c))
+    import RLikeOps._
+    m.getFlavor.getStructure match {
+      case TraversingStructureEnum.COLWISE | 
TraversingStructureEnum.SPARSECOLWISE =>
+        for (col <- t; el <- col.all) el := f(el.index, col.index, el)
+      case default =>
+        for (row <- m; el <- row.all) el := f(row.index, el.index, el)
+    }
+    m
+  }
+
+  /** Functional assign with (Double) => Double */
+  def :=(f: (Double) => Double): Matrix = {
+    import RLikeOps._
+    m.getFlavor.getStructure match {
+      case TraversingStructureEnum.COLWISE | 
TraversingStructureEnum.SPARSECOLWISE =>
+        for (col <- t; el <- col.all) el := f(el)
+      case default =>
+        for (row <- m; el <- row.all) el := f(el)
+    }
     m
   }
 
-  def cloned: Matrix = m.like := m
+  /** Sparse assign: iterate and assign over non-zeros only */
+  def ::=(f: (Int, Int, Double) => Double): Matrix = {
+
+    import RLikeOps._
+
+    m.getFlavor.getStructure match {
+      case TraversingStructureEnum.COLWISE | 
TraversingStructureEnum.SPARSECOLWISE =>
+        for (col <- t; el <- col.nonZeroes) el := f(el.index, col.index, el)
+      case default =>
+        for (row <- m; el <- row.nonZeroes) el := f(row.index, el.index, el)
+    }
+    m
+  }
+
+  /** Sparse function assign: iterate and assign over non-zeros only */
+  def ::=(f: (Double) => Double): Matrix = {
+
+    import RLikeOps._
+
+    m.getFlavor.getStructure match {
+      case TraversingStructureEnum.COLWISE | 
TraversingStructureEnum.SPARSECOLWISE =>
+        for (col <- t; el <- col.nonZeroes) el := f(el)
+      case default =>
+        for (row <- m; el <- row.nonZeroes) el := f(el)
+    }
+    m
+  }
+
+    def cloned: Matrix = m.like := m
 
   /**
    * Ideally, we would probably want to override equals(). But that is not
@@ -155,11 +215,14 @@ class MatrixOps(val m: Matrix) {
    * @return
    */
   def equiv(that: Matrix) =
+
+  // Warning: TODO: This would actually create empty objects in SparseMatrix. 
Should really implement
+  // merge-type comparison strategy using iterateNonEmpty.
     that != null &&
-        nrow == that.nrow &&
-        m.view.zip(that).forall(t => {
-          t._1.equiv(t._2)
-        })
+      nrow == that.nrow &&
+      m.view.zip(that).forall(t => {
+        t._1.equiv(t._2)
+      })
 
   def nequiv(that: Matrix) = !equiv(that)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeDoubleScalarOps.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeDoubleScalarOps.scala
 
b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeDoubleScalarOps.scala
new file mode 100644
index 0000000..a1e9377
--- /dev/null
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeDoubleScalarOps.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.scalabindings
+
+import org.apache.mahout.math._
+
+class RLikeDoubleScalarOps(val x:Double) extends AnyVal{
+
+  import RLikeOps._
+
+  def +(that:Matrix) = that + x
+
+  def +(that:Vector) = that + x
+
+  def *(that:Matrix) = that * x
+
+  def *(that:Vector) = that * x
+
+  def -(that:Matrix) = x -: that
+
+  def -(that:Vector) = x -: that
+
+  def /(that:Matrix) = x /: that
+
+  def /(that:Vector) = x /: that
+  
+  def cbind(that:Matrix) = {
+    val mx = that.like(that.nrow, that.ncol + 1)
+    mx(::, 1 until mx.ncol) := that
+    if (x != 0.0) mx(::, 0) := x
+    mx
+  }
+
+  def rbind(that: Matrix) = {
+    val mx = that.like(that.nrow + 1, that.ncol)
+    mx(1 until mx.nrow, ::) := that
+    if (x != 0.0) mx(0, ::) := x
+    mx
+  }
+
+  def c(that: Vector): Vector = {
+    val cv = that.like(that.length + 1)
+    cv(1 until cv.length) := that
+    cv(0) = x
+    cv
+  }
+
+}

Reply via email to