MAHOUT-1660 MAHOUT-1713 MAHOUT-1714 MAHOUT-1715 MAHOUT-1716 MAHOUT-1717 MAHOUT-1718 MAHOUT-1719 MAHOUT-1720 MAHOUT-1721 MAHOUT-1722 MAHOUT-1723 MAHOUT-1724 MAHOUT-1725 MAHOUT-1726 MAHOUT-1727 MAHOUT-1728 MAHOUT-1729 MAHOUT-1730 MAHOUT-1731 MAHOUT-1732 Cumulative patch for the above issues. Closes apache/mahout#135
Squashed commit of the following: commit c59bf8a21e1ad77dee80730772d2184b3f28a495 Author: Dmitriy Lyubimov <[email protected]> Date: Mon Jun 8 18:11:57 2015 -0700 handling degenerate matrix cases for rbind, cbind, and serialization (0 columns or rows) commit 56b735e137355e174facffd409d6456360c2f8e7 Author: Dmitriy Lyubimov <[email protected]> Date: Mon Jun 8 16:58:34 2015 -0700 Inserting back the testing framework artifact being built. Need this as a dependency in subordinate projects that do method testing as well. commit 7e6ce766d06c5a2337dd9b08df7c9fa37bd9a9c8 Author: Dmitriy Lyubimov <[email protected]> Date: Mon Jun 8 10:22:53 2015 -0700 adding "final" for logger per comment on public PR commit e42bcedf8521b89c7583f8e7e299c2be0c2a8de2 Author: Dmitriy Lyubimov <[email protected]> Date: Tue Jun 2 12:24:30 2015 -0700 final fixes in h20. fixing @deprecated warnings in atb commit 00fb618ad0ef0e5b8aac30c88b23d2e9325ea8f8 Author: Dmitriy Lyubimov <[email protected]> Date: Tue Jun 2 12:08:13 2015 -0700 h20 stuff commit f4e15506ed2497bc2e179e3ded9ca399fd826d15 Author: Dmitriy Lyubimov <[email protected]> Date: Tue Jun 2 11:55:30 2015 -0700 restoring merge errors in h2o module, nothing is touched here. commit 1b892de589bccf03c41c6b2e49493472e6bd1d52 Author: Dmitriy Lyubimov <[email protected]> Date: Mon Jun 1 18:44:21 2015 -0700 Picking up missing changes on both sides in spark module. TODO: Pat's similarity driver tests fail, seems, on some degenerate splitting in optimizer. Need to take a look. commit 3422046b94c03d43a91f091e38532339cf890351 Author: Dmitriy Lyubimov <[email protected]> Date: Mon Jun 1 18:13:03 2015 -0700 Adding missing change. uncommenting performance in-core tests. commit 7aa5de5431ad01c37e8069956287194c97b37b06 Author: Dmitriy Lyubimov <[email protected]> Date: Mon Jun 1 17:54:17 2015 -0700 Initial merge with ora private-review branch. Stuff compiles up to h2o (which needs to be added some unimplemented stuff) and ssvd tests are failing in math-scala module due to lack of matrix flavor on mmul. They are not failing in private branch though -- some changes still have not been merged? Most changes i care about seems to be there though. Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/8a6b805a Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/8a6b805a Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/8a6b805a Branch: refs/heads/mahout-0.10.x Commit: 8a6b805a3c15080f28be050a83b1ad26a60f21e6 Parents: e6d24b9 Author: Dmitriy Lyubimov <[email protected]> Authored: Wed Jun 10 17:08:37 2015 -0700 Committer: Dmitriy Lyubimov <[email protected]> Committed: Wed Jun 10 17:08:37 2015 -0700 ---------------------------------------------------------------------- CHANGELOG | 41 +++ bin/mahout | 4 +- .../apache/mahout/h2obindings/drm/H2OBCast.java | 12 + .../apache/mahout/h2obindings/H2OEngine.scala | 66 ++-- .../org/apache/mahout/logging/package.scala | 73 ++++ .../apache/mahout/math/decompositions/DQR.scala | 9 +- .../mahout/math/decompositions/DSSVD.scala | 19 +- .../mahout/math/decompositions/SSVD.scala | 2 +- .../org/apache/mahout/math/drm/BCast.scala | 3 +- .../mahout/math/drm/CheckpointedOps.scala | 7 + .../mahout/math/drm/DistributedEngine.scala | 125 ++++--- .../mahout/math/drm/DrmDoubleScalarOps.scala | 8 +- .../org/apache/mahout/math/drm/DrmLikeOps.scala | 7 +- .../apache/mahout/math/drm/RLikeDrmOps.scala | 55 ++- .../math/drm/logical/AbstractUnaryOp.scala | 2 +- .../math/drm/logical/CheckpointAction.scala | 3 +- .../mahout/math/drm/logical/OpAewScalar.scala | 6 +- .../math/drm/logical/OpAewUnaryFunc.scala | 47 +++ .../math/drm/logical/OpAewUnaryFuncFusion.scala | 62 ++++ .../mahout/math/drm/logical/OpCbind.scala | 2 +- .../mahout/math/drm/logical/OpCbindScalar.scala | 37 ++ .../mahout/math/drm/logical/OpMapBlock.scala | 2 +- .../mahout/math/drm/logical/TEwFunc.scala | 37 ++ .../org/apache/mahout/math/drm/package.scala | 50 ++- .../math/scalabindings/DoubleScalarOps.scala | 42 --- .../apache/mahout/math/scalabindings/MMul.scala | 295 ++++++++++++++++ .../mahout/math/scalabindings/MatrixOps.scala | 87 ++++- .../scalabindings/RLikeDoubleScalarOps.scala | 63 ++++ .../math/scalabindings/RLikeMatrixOps.scala | 77 ++++- .../mahout/math/scalabindings/RLikeOps.scala | 4 +- .../math/scalabindings/RLikeTimesOps.scala | 28 -- .../math/scalabindings/RLikeVectorOps.scala | 29 +- .../mahout/math/scalabindings/VectorOps.scala | 45 ++- .../mahout/math/scalabindings/package.scala | 81 ++++- .../org/apache/mahout/util/IOUtilsScala.scala | 64 ++++ .../mahout/math/drm/DrmLikeOpsSuiteBase.scala | 20 ++ .../mahout/math/drm/DrmLikeSuiteBase.scala | 3 +- .../mahout/math/drm/RLikeDrmOpsSuiteBase.scala | 94 ++++- .../math/scalabindings/MatrixOpsSuite.scala | 33 +- .../scalabindings/RLikeMatrixOpsSuite.scala | 276 +++++++++++++++ .../math/scalabindings/VectorOpsSuite.scala | 19 +- .../org/apache/mahout/math/AbstractMatrix.java | 24 +- .../org/apache/mahout/math/ConstantVector.java | 5 + .../apache/mahout/math/DelegatingVector.java | 5 + .../org/apache/mahout/math/DenseMatrix.java | 9 +- .../mahout/math/DenseSymmetricMatrix.java | 2 + .../org/apache/mahout/math/DenseVector.java | 5 + .../org/apache/mahout/math/DiagonalMatrix.java | 14 + .../math/FileBasedSparseBinaryMatrix.java | 5 + .../mahout/math/FunctionalMatrixView.java | 9 + .../java/org/apache/mahout/math/Matrices.java | 18 +- .../java/org/apache/mahout/math/Matrix.java | 7 + .../apache/mahout/math/MatrixVectorView.java | 5 + .../java/org/apache/mahout/math/MatrixView.java | 6 + .../org/apache/mahout/math/NamedVector.java | 5 + .../apache/mahout/math/PermutedVectorView.java | 5 + .../mahout/math/RandomAccessSparseVector.java | 5 + .../math/SequentialAccessSparseVector.java | 7 + .../apache/mahout/math/SparseColumnMatrix.java | 20 +- .../org/apache/mahout/math/SparseMatrix.java | 30 +- .../org/apache/mahout/math/SparseRowMatrix.java | 7 + .../mahout/math/TransposedMatrixView.java | 147 ++++++++ .../org/apache/mahout/math/UpperTriangular.java | 9 + .../java/org/apache/mahout/math/Vector.java | 8 + .../org/apache/mahout/math/VectorIterable.java | 4 + .../java/org/apache/mahout/math/VectorView.java | 7 +- .../org/apache/mahout/math/flavor/BackEnum.java | 26 ++ .../apache/mahout/math/flavor/MatrixFlavor.java | 82 +++++ .../math/flavor/TraversingStructureEnum.java | 48 +++ .../org/apache/mahout/math/MatricesTest.java | 4 +- .../math/hadoop/DistributedRowMatrix.java | 5 + .../stochasticsvd/qr/GivensThinSolver.java | 5 + .../sparkbindings/shell/MahoutSparkILoop.scala | 15 +- spark/pom.xml | 16 + .../org/apache/mahout/common/DrmMetadata.scala | 17 + .../org/apache/mahout/common/HDFSUtil.scala | 4 +- .../apache/mahout/common/Hadoop1HDFSUtil.scala | 18 +- .../mahout/sparkbindings/SparkEngine.scala | 227 +++++++++---- .../apache/mahout/sparkbindings/blas/ABt.scala | 200 +++++++++-- .../apache/mahout/sparkbindings/blas/AewB.scala | 75 +++- .../mahout/sparkbindings/blas/AinCoreB.scala | 16 +- .../apache/mahout/sparkbindings/blas/At.scala | 15 +- .../apache/mahout/sparkbindings/blas/AtA.scala | 247 +++++++++----- .../apache/mahout/sparkbindings/blas/AtB.scala | 339 ++++++++++++++++--- .../apache/mahout/sparkbindings/blas/Ax.scala | 24 +- .../mahout/sparkbindings/blas/CbindAB.scala | 35 +- .../mahout/sparkbindings/blas/DrmRddOps.scala | 2 + .../mahout/sparkbindings/blas/MapBlock.scala | 15 +- .../apache/mahout/sparkbindings/blas/Par.scala | 74 ++-- .../mahout/sparkbindings/blas/RbindAB.scala | 8 +- .../mahout/sparkbindings/blas/Slicing.scala | 2 +- .../mahout/sparkbindings/blas/package.scala | 174 +++++++++- .../drm/CheckpointedDrmSpark.scala | 41 ++- .../drm/CheckpointedDrmSparkOps.scala | 2 +- .../mahout/sparkbindings/drm/DrmRddInput.scala | 18 +- .../mahout/sparkbindings/drm/SparkBCast.scala | 2 + .../mahout/sparkbindings/drm/package.scala | 23 +- .../io/GenericMatrixKryoSerializer.scala | 189 +++++++++++ .../io/MahoutKryoRegistrator.scala | 28 +- .../io/UnsupportedSerializer.scala | 31 ++ .../sparkbindings/io/VectorKryoSerializer.scala | 252 ++++++++++++++ .../apache/mahout/sparkbindings/package.scala | 118 +++---- .../sparkbindings/SparkBindingsSuite.scala | 12 +- .../mahout/sparkbindings/blas/BlasSuite.scala | 4 +- .../sparkbindings/drm/DrmLikeOpsSuite.scala | 17 +- .../sparkbindings/drm/RLikeDrmOpsSuite.scala | 63 ++++ .../mahout/sparkbindings/io/IOSuite.scala | 195 +++++++++++ .../test/DistributedSparkSuite.scala | 15 +- .../test/LoggerConfiguration.scala | 2 +- 109 files changed, 4310 insertions(+), 702 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 89c2cbc..dd65b0e 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,47 @@ Mahout Change Log Release 0.10.2 - unreleased + MAHOUT-1660: Hadoop1HDFSUtil.readDRMHEader should be taking Hadoop conf (dlyubimov) + + MAHOUT-1713: Performance and parallelization improvements for AB', A'B, A'A spark physical operators (dlyubimov) + + MAHOUT-1714: Add MAHOUT_OPTS environment when running Spark shell (dlyubimov) + + MAHOUT-1715: Closeable API for broadcast tensors (dlyubimov) + + MAHOUT-1716: Scala logging style (dlyubimov) + + MAHOUT-1717: allreduceBlock() operator api and Spark implementation (dlyubimov) + + MAHOUT-1718: Support for conversion of any type-keyed DRM into ordinally-keyed DRM (dlyubimov) + + MAHOUT-1719: Unary elementwise function operator and function fusions (dlyubimov) + + MAHOUT-1720: Support 1 cbind X, X cbind 1 etc. for both Matrix and DRM (dlyubimov) + + MAHOUT-1721: rowSumsMap() summary for non-int-keyed DRMs (dlyubimov) + + MAHOUT-1722: DRM row sampling api (dlyubimov) + + MAHOUT-1723: Optional structural "flavor" abstraction for in-core matrices (dlyubimov) + + MAHOUT-1724: Optimizations of matrix-matrix in-core multiplication based on structural flavors (dlyubimov) + + MAHOUT-1725: elementwise power operator ^ (dlyubimov) + + MAHOUT-1726: R-like vector concatenation operator (dlyubimov) + + MAHOUT-1727: Elementwise analogues of scala.math functions for tensor types (dlyubimov) + + MAHOUT-1728: In-core functional assignments (dlyubimov) + + MAHOUT-1729: Straighten out behavior of Matrix.iterator() and iterateNonEmpty() (dlyubimov) + + MAHOUT-1730: New mutable transposition view for in-core matrices (dlyubimov) + + MAHOUT-1731: Deprecate SparseColumnMatrix (dlyubimov) + + MAHOUT-1732: Native support for kryo serialization of tensor types (dlyubimov) Release 0.10.1 - 2015-05-31 http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/bin/mahout ---------------------------------------------------------------------- diff --git a/bin/mahout b/bin/mahout index ee0b918..24f01ba 100755 --- a/bin/mahout +++ b/bin/mahout @@ -254,12 +254,10 @@ fi # restore ordinary behaviour unset IFS - - case "$1" in (spark-shell) save_stty=$(stty -g 2>/dev/null); - "$JAVA" $JAVA_HEAP_MAX -classpath "$CLASSPATH" "org.apache.mahout.sparkbindings.shell.Main" $@ + "$JAVA" $JAVA_HEAP_MAX $MAHOUT_OPTS -classpath "$CLASSPATH" "org.apache.mahout.sparkbindings.shell.Main" $@ stty sane; stty $save_stty ;; # Spark CLI drivers go here http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2OBCast.java ---------------------------------------------------------------------- diff --git a/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2OBCast.java b/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2OBCast.java index 523a771..ebcc626 100644 --- a/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2OBCast.java +++ b/h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2OBCast.java @@ -118,4 +118,16 @@ public class H2OBCast<T> implements BCast<T>, Serializable { } return ret; } + + /** + * Stop broadcasting when called on driver side. Release any network resources. + * + */ + @Override + public void close() throws IOException { + + // TODO: review this. It looks like it is not really a broadcast mechanism but rather just a + // serialization wrapper. In which case it doesn't hold any network resources. + + } } http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala index 173d5a0..e0ac302 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala @@ -26,9 +26,13 @@ import org.apache.mahout.math.drm.logical._ import org.apache.mahout.h2obindings.ops._ import org.apache.mahout.h2obindings.drm._ import org.apache.mahout.h2o.common.{Hadoop1HDFSUtil, HDFSUtil} +import org.apache.mahout.logging._ /** H2O specific non-DRM operations */ object H2OEngine extends DistributedEngine { + + private final implicit val log = getLog(H2OEngine.getClass) + // By default, use Hadoop 1 utils var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil @@ -119,40 +123,64 @@ object H2OEngine extends DistributedEngine { abstract class IndexedDatasetH2O(val matrix: CheckpointedDrm[Int], val rowIDs: BiDictionary, val columnIDs: BiDictionary) extends IndexedDataset {} - /** - * reads an IndexedDatasetH2O from default text delimited files + /** + * Reads an IndexedDatasetH2O from default text delimited files * @todo unimplemented * @param src a comma separated list of URIs to read from * @param schema how the text file is formatted * @return */ def indexedDatasetDFSRead(src: String, - schema: Schema = DefaultIndexedDatasetReadSchema, - existingRowIDs: Option[BiDictionary] = None) - (implicit sc: DistributedContext): - IndexedDatasetH2O = { - // should log a warning when this is built but no logger here, can an H2O contributor help with this - println("Warning: unimplemented indexedDatasetDFSReadElements." ) - throw new UnsupportedOperationException("IndexedDatasetH2O is not implemented so can't be read.") - null.asInstanceOf[IndexedDatasetH2O] + schema: Schema = DefaultIndexedDatasetReadSchema, + existingRowIDs: Option[BiDictionary] = None) + (implicit sc: DistributedContext): + IndexedDatasetH2O = { + + error("Unimplemented indexedDatasetDFSReadElements.") + + ??? } /** - * reads an IndexedDatasetH2O from default text delimited files + * Reads an IndexedDatasetH2O from default text delimited files * @todo unimplemented * @param src a comma separated list of URIs to read from * @param schema how the text file is formatted * @return */ def indexedDatasetDFSReadElements(src: String, - schema: Schema = DefaultIndexedDatasetReadSchema, - existingRowIDs: Option[BiDictionary] = None) - (implicit sc: DistributedContext): - IndexedDatasetH2O = { - // should log a warning when this is built but no logger here, can an H2O contributor help with this - println("Warning: unimplemented indexedDatasetDFSReadElements." ) - throw new UnsupportedOperationException("IndexedDatasetH2O is not implemented so can't be read by elements.") - null.asInstanceOf[IndexedDatasetH2O] + schema: Schema = DefaultIndexedDatasetReadSchema, + existingRowIDs: Option[BiDictionary] = None) + (implicit sc: DistributedContext): IndexedDatasetH2O = { + + error("Unimplemented indexedDatasetDFSReadElements.") + + ??? } + /** + * Optional engine-specific all reduce tensor operation. + * + * TODO: implement this please. + * + */ + override def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc) + : Matrix = ??? + + /** + * TODO: implement this please. + */ + override def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples: Int, replacement: Boolean): Matrix = ??? + + /** + * (Optional) Sampling operation. Consistent with Spark semantics of the same. + * TODO: implement this please. + */ + override def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean): DrmLike[K] = ??? + + /** + * TODO: implement this please. + */ + override def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean) + : (DrmLike[Int], Option[DrmLike[K]]) = ??? } http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/logging/package.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/logging/package.scala b/math-scala/src/main/scala/org/apache/mahout/logging/package.scala new file mode 100644 index 0000000..15aa909 --- /dev/null +++ b/math-scala/src/main/scala/org/apache/mahout/logging/package.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout + +import org.apache.log4j.{Level, Priority, Logger} + +package object logging { + + /** Compute `expr` if debug is on, only */ + def debugDo[T](expr: => T)(implicit log: Logger): Option[T] = { + if (log.isDebugEnabled) Some(expr) + else None + } + + /** Compute `expr` if trace is on, only */ + def traceDo[T](expr: => T)(implicit log: Logger): Option[T] = { + if (log.isTraceEnabled) Some(expr) else None + } + + /** Shorter, and lazy, versions of logging methods. Just declare log implicit. */ + def debug(msg: => AnyRef)(implicit log: Logger) { if (log.isDebugEnabled) log.debug(msg) } + + def debug(msg: => AnyRef, t: Throwable)(implicit log: Logger) { if (log.isDebugEnabled()) log.debug(msg, t) } + + /** Shorter, and lazy, versions of logging methods. Just declare log implicit. */ + def trace(msg: => AnyRef)(implicit log: Logger) { if (log.isTraceEnabled) log.trace(msg) } + + def trace(msg: => AnyRef, t: Throwable)(implicit log: Logger) { if (log.isTraceEnabled()) log.trace(msg, t) } + + def info(msg: => AnyRef)(implicit log: Logger) { if (log.isInfoEnabled) log.info(msg)} + + def info(msg: => AnyRef, t:Throwable)(implicit log: Logger) { if (log.isInfoEnabled) log.info(msg,t)} + + def warn(msg: => AnyRef)(implicit log: Logger) { if (log.isEnabledFor(Level.WARN)) log.warn(msg) } + + def warn(msg: => AnyRef, t: Throwable)(implicit log: Logger) { if (log.isEnabledFor(Level.WARN)) error(msg, t) } + + def error(msg: => AnyRef)(implicit log: Logger) { if (log.isEnabledFor(Level.ERROR)) log.warn(msg) } + + def error(msg: => AnyRef, t: Throwable)(implicit log: Logger) { if (log.isEnabledFor(Level.ERROR)) error(msg, t) } + + def fatal(msg: => AnyRef)(implicit log: Logger) { if (log.isEnabledFor(Level.FATAL)) log.fatal(msg) } + + def fatal(msg: => AnyRef, t: Throwable)(implicit log: Logger) { if (log.isEnabledFor(Level.FATAL)) log.fatal(msg, t) } + + def getLog(name: String): Logger = Logger.getLogger(name) + + def getLog(clazz: Class[_]): Logger = Logger.getLogger(clazz) + + def mahoutLog :Logger = getLog("org.apache.mahout") + + def setLogLevel(l:Level)(implicit log:Logger) = { + log.setLevel(l) + } + + def setAdditivity(a:Boolean)(implicit log:Logger) = log.setAdditivity(a) + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala index 7caa3dd..866ee34 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala @@ -18,6 +18,7 @@ package org.apache.mahout.math.decompositions import scala.reflect.ClassTag +import org.apache.mahout.logging._ import org.apache.mahout.math.Matrix import org.apache.mahout.math.scalabindings._ import RLikeOps._ @@ -27,7 +28,7 @@ import org.apache.log4j.Logger object DQR { - private val log = Logger.getLogger(DQR.getClass) + private final implicit val log = getLog(DQR.getClass) /** * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty @@ -41,19 +42,19 @@ object DQR { def dqrThin[K: ClassTag](drmA: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = { if (drmA.ncol > 5000) - log.warn("A is too fat. A'A must fit in memory and easily broadcasted.") + warn("A is too fat. A'A must fit in memory and easily broadcasted.") implicit val ctx = drmA.context val AtA = (drmA.t %*% drmA).checkpoint() val inCoreAtA = AtA.collect - if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA)) + trace("A'A=\n%s\n".format(inCoreAtA)) val ch = chol(inCoreAtA) val inCoreR = (ch.getL cloned) t - if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR)) + trace("R=\n%s\n".format(inCoreR)) if (checkRankDeficiency && !ch.isPositiveDefinite) throw new IllegalArgumentException("R is rank-deficient.") http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala index 1abfb87..cecaec8 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala @@ -7,9 +7,12 @@ import RLikeOps._ import org.apache.mahout.math.drm._ import RLikeDrmOps._ import org.apache.mahout.common.RandomUtils +import org.apache.mahout.logging._ object DSSVD { + private final implicit val log = getLog(DSSVD.getClass) + /** * Distributed Stochastic Singular Value decomposition algorithm. * @@ -43,18 +46,22 @@ object DSSVD { case (keys, blockA) => val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed) keys -> blockY - } + }.checkpoint() - var drmQ = dqrThin(drmY.checkpoint())._1 + var drmQ = dqrThin(drmY)._1 // Checkpoint Q if last iteration if (q == 0) drmQ = drmQ.checkpoint() + trace(s"dssvd:drmQ=${drmQ.collect}.") + // This actually should be optimized as identically partitioned map-side A'B since A and Q should // still be identically partitioned. var drmBt = drmAcp.t %*% drmQ // Checkpoint B' if last iteration if (q == 0) drmBt = drmBt.checkpoint() + trace(s"dssvd:drmB'=${drmBt.collect}.") + for (i <- 0 until q) { drmY = drmAcp %*% drmBt drmQ = dqrThin(drmY.checkpoint())._1 @@ -62,13 +69,17 @@ object DSSVD { if (i == q - 1) drmQ = drmQ.checkpoint() // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not - // identically partitioned anymore. + // identically partitioned anymore.` drmBt = drmAcp.t %*% drmQ // Checkpoint B' if last iteration if (i == q - 1) drmBt = drmBt.checkpoint() } - val (inCoreUHat, d) = eigen(drmBt.t %*% drmBt) + val mxBBt:Matrix = drmBt.t %*% drmBt + + trace(s"dssvd: BB'=$mxBBt.") + + val (inCoreUHat, d) = eigen(mxBBt) val s = d.sqrt // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala index 80385a3..e1b2f03 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala @@ -150,7 +150,7 @@ private[math] object SSVD { val c = s_q cross s_b // BB' computation becomes - val bbt = bt.t %*% bt -c - c.t + (s_q cross s_q) * (xi dot xi) + val bbt = bt.t %*% bt - c - c.t + (s_q cross s_q) * (xi dot xi) val (uhat, d) = eigen(bbt) http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala index 850614457..b86e286 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala @@ -18,6 +18,7 @@ package org.apache.mahout.math.drm /** Broadcast variable abstraction */ -trait BCast[T] { +trait BCast[T] extends java.io.Closeable { def value:T + } http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala index 8c3911f..c43c6c7 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala @@ -20,6 +20,7 @@ package org.apache.mahout.math.drm import scala.reflect.ClassTag import org.apache.mahout.math._ +import org.apache.mahout.math.scalabindings.RLikeOps._ /** * Additional experimental operations over CheckpointedDRM implementation. I will possibly move them up to @@ -38,6 +39,12 @@ class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) { /** Column Means */ def colMeans(): Vector = drm.context.colMeans(drm) + /** Optional engine-specific all reduce tensor operation. */ + def allreduceBlock(bmf: BlockMapFunc2[K], rf: BlockReduceFunc = _ += _): Matrix = + + drm.context.allreduceBlock(drm, bmf, rf) + + def norm():Double = drm.context.norm(drm) } http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala index bb6772a..519a127 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala @@ -19,16 +19,15 @@ package org.apache.mahout.math.drm import org.apache.mahout.math.indexeddataset._ -import scala.reflect.ClassTag import logical._ import org.apache.mahout.math._ import scalabindings._ import RLikeOps._ -import RLikeDrmOps._ import DistributedEngine._ -import org.apache.mahout.math.scalabindings._ import org.apache.log4j.Logger +import scala.reflect.ClassTag + /** Abstraction of optimizer/distributed engine */ trait DistributedEngine { @@ -37,7 +36,7 @@ trait DistributedEngine { * introduce logical constructs (including engine-specific ones) that user DSL cannot even produce * per se. * <P> - * + * * A particular physical engine implementation may choose to either use the default rewrites or * build its own rewriting rules. * <P> @@ -50,6 +49,9 @@ trait DistributedEngine { /** Engine-specific colSums implementation based on a checkpoint. */ def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector + /** Optional engine-specific all reduce tensor operation. */ + def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix + /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */ def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector @@ -73,20 +75,39 @@ trait DistributedEngine { def drmDfsRead(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */ - def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1) - (implicit sc: DistributedContext): CheckpointedDrm[Int] + def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)(implicit sc: DistributedContext): + CheckpointedDrm[Int] /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */ - def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1) - (implicit sc: DistributedContext): CheckpointedDrm[String] + def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)(implicit sc: DistributedContext): + CheckpointedDrm[String] /** This creates an empty DRM with specified number of partitions and cardinality. */ - def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10) - (implicit sc: DistributedContext): CheckpointedDrm[Int] + def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)(implicit sc: DistributedContext): + CheckpointedDrm[Int] /** Creates empty DRM with non-trivial height */ - def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10) - (implicit sc: DistributedContext): CheckpointedDrm[Long] + def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)(implicit sc: DistributedContext): + CheckpointedDrm[Long] + + /** + * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys + * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix. + */ + def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) + + /** + * (Optional) Sampling operation. Consistent with Spark semantics of the same. + * @param drmX + * @param fraction + * @param replacement + * @tparam K + * @return + */ + def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] + + def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement:Boolean = false) : Matrix + /** * Load IndexedDataset from text delimited format. * @param src comma delimited URIs to read from @@ -119,38 +140,49 @@ object DistributedEngine { private def pass1[K: ClassTag](action: DrmLike[K]): DrmLike[K] = { action match { - case OpAB(OpAt(a), b) if (a == b) => OpAtA(pass1(a)) - case OpABAnyKey(OpAtAnyKey(a), b) if (a == b) => OpAtA(pass1(a)) + + // self element-wise rewrite + case OpAewB(a, b, op) if (a == b) => { + op match { + case "*" â OpAewUnaryFunc(pass1(a), (x) â x * x) + case "/" â OpAewUnaryFunc(pass1(a), (x) â x / x) + // Self "+" and "-" don't make a lot of sense, but we do include it for completeness. + case "+" â OpAewUnaryFunc(pass1(a), 2.0 * _) + case "-" â OpAewUnaryFunc(pass1(a), (_) â 0.0) + case _ â + require(false, s"Unsupported operator $op") + null + } + } + case OpAB(OpAt(a), b) if (a == b) â OpAtA(pass1(a)) + case OpABAnyKey(OpAtAnyKey(a), b) if (a == b) â OpAtA(pass1(a)) // For now, rewrite left-multiply via transpositions, i.e. // inCoreA %*% B = (B' %*% inCoreA')' - case op@OpTimesLeftMatrix(a, b) => - OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t)) + case op@OpTimesLeftMatrix(a, b) â + OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t)) // Add vertical row index concatenation for rbind() on DrmLike[Int] fragments - case op@OpRbind(a, b) if (implicitly[ClassTag[K]] == ClassTag.Int) => + case op@OpRbind(a, b) if (implicitly[ClassTag[K]] == ClassTag.Int) â // Make sure closure sees only local vals, not attributes. We need to do these ugly casts // around because compiler could not infer that K is the same as Int, based on if() above. val ma = safeToNonNegInt(a.nrow) - val bAdjusted = new OpMapBlock[Int, Int]( - A = pass1(b.asInstanceOf[DrmLike[Int]]), - bmf = { - case (keys, block) => keys.map(_ + ma) -> block - }, - identicallyPartitioned = false - ) + val bAdjusted = new OpMapBlock[Int, Int](A = pass1(b.asInstanceOf[DrmLike[Int]]), bmf = { + case (keys, block) â keys.map(_ + ma) â block + }, identicallyPartitioned = false) val aAdjusted = a.asInstanceOf[DrmLike[Int]] OpRbind(pass1(aAdjusted), bAdjusted).asInstanceOf[DrmLike[K]] // Stop at checkpoints - case cd: CheckpointedDrm[_] => action + case cd: CheckpointedDrm[_] â action // For everything else we just pass-thru the operator arguments to optimizer - case uop: AbstractUnaryOp[_, K] => + case uop: AbstractUnaryOp[_, K] â uop.A = pass1(uop.A)(uop.classTagA) uop - case bop: AbstractBinaryOp[_, _, K] => + + case bop: AbstractBinaryOp[_, _, K] â bop.A = pass1(bop.A)(bop.classTagA) bop.B = pass1(bop.B)(bop.classTagB) bop @@ -160,17 +192,30 @@ object DistributedEngine { /** This would remove stuff like A.t.t that previous step may have created */ private def pass2[K: ClassTag](action: DrmLike[K]): DrmLike[K] = { action match { + + // Fusion of unary funcs into single, like 1 + x * x. + // Since we repeating the pass over self after rewrite, we dont' need to descend into arguments + // recursively here. + case op1@OpAewUnaryFunc(op2@OpAewUnaryFunc(a, _, _), _, _) â + pass2(OpAewUnaryFuncFusion(a, op1 :: op2 :: Nil)) + + // Fusion one step further, like 1 + 2 * x * x. All should be rewritten as one UnaryFuncFusion. + // Since we repeating the pass over self after rewrite, we dont' need to descend into arguments + // recursively here. + case op@OpAewUnaryFuncFusion(op2@OpAewUnaryFunc(a, _, _), _) â + pass2(OpAewUnaryFuncFusion(a, op.ff :+ op2)) + // A.t.t => A - case OpAt(top@OpAt(a)) => pass2(a)(top.classTagA) + case OpAt(top@OpAt(a)) â pass2(a)(top.classTagA) // Stop at checkpoints - case cd: CheckpointedDrm[_] => action + case cd: CheckpointedDrm[_] â action // For everything else we just pass-thru the operator arguments to optimizer - case uop: AbstractUnaryOp[_, K] => + case uop: AbstractUnaryOp[_, K] â uop.A = pass2(uop.A)(uop.classTagA) uop - case bop: AbstractBinaryOp[_, _, K] => + case bop: AbstractBinaryOp[_, _, K] â bop.A = pass2(bop.A)(bop.classTagA) bop.B = pass2(bop.B)(bop.classTagB) bop @@ -182,29 +227,29 @@ object DistributedEngine { action match { // matrix products. - case OpAB(a, OpAt(b)) => OpABt(pass3(a), pass3(b)) + case OpAB(a, OpAt(b)) â OpABt(pass3(a), pass3(b)) // AtB cases that make sense. - case OpAB(OpAt(a), b) if (a.partitioningTag == b.partitioningTag) => OpAtB(pass3(a), pass3(b)) - case OpABAnyKey(OpAtAnyKey(a), b) => OpAtB(pass3(a), pass3(b)) + case OpAB(OpAt(a), b) if (a.partitioningTag == b.partitioningTag) â OpAtB(pass3(a), pass3(b)) + case OpABAnyKey(OpAtAnyKey(a), b) â OpAtB(pass3(a), pass3(b)) // Need some cost to choose between the following. - case OpAB(OpAt(a), b) => OpAtB(pass3(a), pass3(b)) + case OpAB(OpAt(a), b) â OpAtB(pass3(a), pass3(b)) // case OpAB(OpAt(a), b) => OpAt(OpABt(OpAt(pass1(b)), pass1(a))) - case OpAB(a, b) => OpABt(pass3(a), OpAt(pass3(b))) + case OpAB(a, b) â OpABt(pass3(a), OpAt(pass3(b))) // Rewrite A'x - case op@OpAx(op1@OpAt(a), x) => OpAtx(pass3(a)(op1.classTagA), x) + case op@OpAx(op1@OpAt(a), x) â OpAtx(pass3(a)(op1.classTagA), x) // Stop at checkpoints - case cd: CheckpointedDrm[_] => action + case cd: CheckpointedDrm[_] â action // For everything else we just pass-thru the operator arguments to optimizer - case uop: AbstractUnaryOp[_, K] => + case uop: AbstractUnaryOp[_, K] â uop.A = pass3(uop.A)(uop.classTagA) uop - case bop: AbstractBinaryOp[_, _, K] => + case bop: AbstractBinaryOp[_, _, K] â bop.A = pass3(bop.A)(bop.classTagA) bop.B = pass3(bop.B)(bop.classTagB) bop http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala index e5cf563..96ef893 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala @@ -18,7 +18,11 @@ package org.apache.mahout.math.drm import RLikeDrmOps._ -import scala.reflect.ClassTag +import org.apache.mahout.math._ +import org.apache.mahout.math.drm.logical.OpCbindScalar +import scalabindings._ +import RLikeOps._ +import reflect.ClassTag class DrmDoubleScalarOps(val x:Double) extends AnyVal{ @@ -30,4 +34,6 @@ class DrmDoubleScalarOps(val x:Double) extends AnyVal{ def /[K:ClassTag](that:DrmLike[K]) = x /: that + def cbind[K: ClassTag](that: DrmLike[K]) = OpCbindScalar(A = that, x = x, leftBind = true) + } http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala index bc937d6..19432d0 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala @@ -49,7 +49,7 @@ class DrmLikeOps[K: ClassTag](protected[drm] val drm: DrmLike[K]) { * is applied. */ def par(min: Int = -1, exact: Int = -1, auto: Boolean = false) = { - assert(min >= 0 || exact >= 0 || auto, "Invalid argument") + require(min > 0 || exact > 0 || auto, "Invalid argument") OpPar(drm, minSplits = min, exactSplits = exact) } @@ -65,16 +65,15 @@ class DrmLikeOps[K: ClassTag](protected[drm] val drm: DrmLike[K]) { * @tparam R * @return */ - def mapBlock[R: ClassTag](ncol: Int = -1, identicallyParitioned: Boolean = true) + def mapBlock[R: ClassTag](ncol: Int = -1, identicallyPartitioned: Boolean = true) (bmf: BlockMapFunc[K, R]): DrmLike[R] = new OpMapBlock[K, R]( A = drm, bmf = bmf, _ncol = ncol, - identicallyPartitioned = identicallyParitioned + identicallyPartitioned = identicallyPartitioned ) - /** * Slicing the DRM. Should eventually work just like in-core drm (e.g. A(0 until 5, 5 until 15)).<P> * http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala index 380f4eb..7927e51 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala @@ -18,12 +18,17 @@ package org.apache.mahout.math.drm import scala.reflect.ClassTag +import collection._ +import JavaConversions._ import org.apache.mahout.math.{Vector, Matrix} import org.apache.mahout.math.drm.logical._ +import org.apache.mahout.math.scalabindings._ +import RLikeOps._ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) { import RLikeDrmOps._ + import org.apache.mahout.math.scalabindings._ def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "+") @@ -33,21 +38,23 @@ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) { def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "/") - def +(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+") + def +(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = _ + that, evalZeros = true) - def +:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "+") + def +:(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = that + _, evalZeros = true) - def -(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-") + def -(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = _ - that, evalZeros = true) - def -:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "-:") + def -:(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = that - _, evalZeros = true) - def *(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "*") + def *(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = _ * that) - def *:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "*") + def *:(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = that * _) - def /(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/") + def ^(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = math.pow(_, that)) - def /:(that: Double): DrmLike[K] = OpAewScalar[K](A = this, scalar = that, op = "/:") + def /(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = _ / that, evalZeros = that == 0.0) + + def /:(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = that / _, evalZeros = true) def :%*%(that: DrmLike[Int]): DrmLike[K] = OpAB[K](A = this.drm, B = that) @@ -65,18 +72,36 @@ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) { def t: DrmLike[Int] = OpAtAnyKey(A = drm) - def cbind(that: DrmLike[K]) = OpCbind(A = this.drm, B = that) + def cbind(that: DrmLike[K]): DrmLike[K] = OpCbind(A = this.drm, B = that) + + def cbind(that: Double): DrmLike[K] = OpCbindScalar(A = this.drm, x = that, leftBind = false) + + def rbind(that: DrmLike[K]): DrmLike[K] = OpRbind(A = this.drm, B = that) - def rbind(that: DrmLike[K]) = OpRbind(A = this.drm, B = that) + /** + * `rowSums` method for non-int keyed matrices. + * + * Slight problem here is the limitation of in-memory representation of Colt's Matrix, which can + * only have String row labels. Therefore, internally we do ".toString()" call on each key object, + * and then put it into [[Matrix]] row label bindings, at which point they are coerced to be Strings. + * + * This is obviously a suboptimal behavior, so as TODO we have here future enhancements of `collect'. + * + * @return map of row keys into row sums, front-end collected. + */ + def rowSumsMap(): Map[String, Double] = { + val m = drm.mapBlock(ncol = 1) { case (keys, block) => + keys -> dense(block.rowSums).t + }.collect + m.getRowLabelBindings.map { case (key, idx) => key -> m(idx, 0)} + } } class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) { import org.apache.mahout.math._ import scalabindings._ - import RLikeOps._ import RLikeDrmOps._ - import scala.collection.JavaConversions._ override def t: DrmLike[Int] = OpAt(A = drm) @@ -108,7 +133,7 @@ class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) { // Collect block-wise row means and output them as one-column matrix. keys -> dense(block.rowMeans).t } - .collect(::, 0) + .collect(::, 0) } /** Return diagonal vector */ @@ -117,14 +142,14 @@ class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) { drm.mapBlock(ncol = 1) { case (keys, block) => keys -> dense(for (r <- block.view) yield r(keys(r.index))).t } - .collect(::, 0) + .collect(::, 0) } } object RLikeDrmOps { - implicit def double2ScalarOps(x:Double) = new DrmDoubleScalarOps(x) + implicit def double2ScalarOps(x: Double) = new DrmDoubleScalarOps(x) implicit def drmInt2RLikeOps(drm: DrmLike[Int]): RLikeDrmIntOps = new RLikeDrmIntOps(drm) http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala index a445f21..60b2c77 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala @@ -24,7 +24,7 @@ import org.apache.mahout.math.drm.{DistributedContext, DrmLike} abstract class AbstractUnaryOp[A: ClassTag, K: ClassTag] extends CheckpointAction[K] with DrmLike[K] { - protected[drm] var A: DrmLike[A] + protected[mahout] var A: DrmLike[A] lazy val context: DistributedContext = A.context http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala index aa3a3b9..a7934a3 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala @@ -37,7 +37,8 @@ abstract class CheckpointAction[K: ClassTag] extends DrmLike[K] { */ def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = cp match { case None => - val physPlan = context.toPhysical(context.optimizerRewrite(this), cacheHint) + val plan = context.optimizerRewrite(this) + val physPlan = context.toPhysical(plan, cacheHint) cp = Some(physPlan) physPlan case Some(cp) => cp http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala index 19a910c..dbcb366 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala @@ -21,7 +21,11 @@ import scala.reflect.ClassTag import org.apache.mahout.math.drm.DrmLike import scala.util.Random -/** Operator denoting expressions like 5.0 - A or A * 5.6 */ +/** + * Operator denoting expressions like 5.0 - A or A * 5.6 + * + * @deprecated use [[OpAewUnaryFunc]] instead + */ case class OpAewScalar[K: ClassTag]( override var A: DrmLike[K], val scalar: Double, http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala new file mode 100644 index 0000000..71489ab --- /dev/null +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.drm.logical + +import scala.reflect.ClassTag +import org.apache.mahout.math.drm.DrmLike +import scala.util.Random + +/** + * @author dmitriy + */ +case class OpAewUnaryFunc[K: ClassTag]( + override var A: DrmLike[K], + val f: (Double) => Double, + val evalZeros:Boolean = false + ) extends AbstractUnaryOp[K,K] with TEwFunc { + + override protected[mahout] lazy val partitioningTag: Long = + if (A.canHaveMissingRows) + Random.nextLong() + else A.partitioningTag + + /** Stuff like `A +1` is always supposed to fix this */ + override protected[mahout] lazy val canHaveMissingRows: Boolean = false + + /** R-like syntax for number of rows. */ + def nrow: Long = A.nrow + + /** R-like syntax for number of columns */ + def ncol: Int = A.ncol +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala new file mode 100644 index 0000000..ed95f4f --- /dev/null +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.drm.logical + +import scala.reflect.ClassTag +import org.apache.mahout.math.drm.DrmLike +import scala.util.Random +import collection._ + +/** + * Composition of unary elementwise functions. + */ +case class OpAewUnaryFuncFusion[K: ClassTag]( + override var A: DrmLike[K], + var ff:List[OpAewUnaryFunc[K]] = Nil + ) extends AbstractUnaryOp[K,K] with TEwFunc { + + override protected[mahout] lazy val partitioningTag: Long = + if (A.canHaveMissingRows) + Random.nextLong() + else A.partitioningTag + + /** Stuff like `A +1` is always supposed to fix this */ + override protected[mahout] lazy val canHaveMissingRows: Boolean = false + + /** R-like syntax for number of rows. */ + def nrow: Long = A.nrow + + /** R-like syntax for number of columns */ + def ncol: Int = A.ncol + + /** Apply to degenerate elements? */ + override def evalZeros: Boolean = ff.exists(_.evalZeros) + + /** the function itself */ + override def f: (Double) => Double = { + + // Make sure composed collection becomes an attribute of this closure because we will be sending + // it to the backend. + val composedFunc = ff.map(_.f) + + // Create functional closure and return. + (x: Double) => (composedFunc :\ x) { case (f, xarg) => f(xarg)} + + } +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala index 1425264..0598551 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala @@ -17,7 +17,7 @@ package org.apache.mahout.math.drm.logical -import scala.reflect.ClassTag +import reflect.ClassTag import org.apache.mahout.math.drm.DrmLike import scala.util.Random http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala new file mode 100644 index 0000000..5aee518 --- /dev/null +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.mahout.math.drm.logical + +import reflect.ClassTag +import org.apache.mahout.math.drm.DrmLike + +case class OpCbindScalar[K:ClassTag]( + override var A:DrmLike[K], + var x:Double, + val leftBind:Boolean ) extends AbstractUnaryOp[K,K] { + + override protected[mahout] lazy val canHaveMissingRows: Boolean = false + + override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag + + /** R-like syntax for number of rows. */ + def nrow: Long = A.nrow + + /** R-like syntax for number of columns */ + def ncol: Int = A.ncol + 1 + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala index 7299d9e..a1cd718 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala @@ -23,7 +23,7 @@ import RLikeOps._ import org.apache.mahout.math.drm.{BlockMapFunc, DrmLike} import scala.util.Random -class OpMapBlock[S: ClassTag, R: ClassTag]( +case class OpMapBlock[S: ClassTag, R: ClassTag]( override var A: DrmLike[S], val bmf: BlockMapFunc[S, R], val _ncol: Int = -1, http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/TEwFunc.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/TEwFunc.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/TEwFunc.scala new file mode 100644 index 0000000..0eb5f65 --- /dev/null +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/TEwFunc.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.drm.logical + +/** + * Trait denoting logical operators providing elementwise operations that work as unary operators + * on each element of a matrix. + */ +trait TEwFunc { + + /** Apply to degenerate elments? */ + def evalZeros: Boolean + + /** the function itself */ + def f: (Double) => Double + + /** + * Self assignment ok? If yes, may cause side effects if works off non-serialized cached object + * tree! + */ + def selfAssignOk: Boolean = false +} http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala index 1fae831..d865b58 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala @@ -23,6 +23,8 @@ import org.apache.mahout.math.scalabindings.RLikeOps._ import org.apache.mahout.math.scalabindings._ import scala.reflect.ClassTag +import org.apache.mahout.math.drm.logical.OpAewUnaryFunc +import collection._ package object drm { @@ -34,7 +36,11 @@ package object drm { /** Block-map func */ - type BlockMapFunc[S, R] = BlockifiedDrmTuple[S] => BlockifiedDrmTuple[R] + type BlockMapFunc[S, R] = BlockifiedDrmTuple[S] â BlockifiedDrmTuple[R] + + type BlockMapFunc2[S] = BlockifiedDrmTuple[S] â Matrix + + type BlockReduceFunc = (Matrix, Matrix) â Matrix /** CacheHint type */ // type CacheHint = CacheHint.CacheHint @@ -92,7 +98,7 @@ package object drm { implicit def drm2InCore[K: ClassTag](drm: DrmLike[K]): Matrix = drm.collect /** Do vertical concatenation of collection of blockified tuples */ - def rbind[K: ClassTag](blocks: Iterable[BlockifiedDrmTuple[K]]): BlockifiedDrmTuple[K] = { + private[mahout] def rbind[K: ClassTag](blocks: Iterable[BlockifiedDrmTuple[K]]): BlockifiedDrmTuple[K] = { assert(blocks.nonEmpty, "rbind: 0 blocks passed in") if (blocks.size == 1) { // No coalescing required. @@ -115,6 +121,46 @@ package object drm { } } + /** + * Convert arbitrarily-keyed matrix to int-keyed matrix. Some algebra will accept only int-numbered + * row matrices. So this method is to help. + * + * @param drmX input to be transcoded + * @param computeMap collect `old key -> int key` map to front-end? + * @tparam K key type + * @return Sequentially keyed matrix + (optionally) map from non-int key to [[Int]] key. If the + * key type is actually Int, then we just return the argument with None for the map, + * regardless of computeMap parameter. + */ + def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) = + drmX.context.engine.drm2IntKeyed(drmX, computeMap) + + /** + * (Optional) Sampling operation. Consistent with Spark semantics of the same. + * @param drmX + * @param fraction + * @param replacement + * @tparam K + * @return samples + */ + def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] = + drmX.context.engine.drmSampleRows(drmX, fraction, replacement) + + def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples: Int, replacement: Boolean = false): Matrix = + drmX.context.engine.drmSampleKRows(drmX, numSamples, replacement) + + /////////////////////////////////////////////////////////// + // Elementwise unary functions on distributed operands. + def dexp[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.exp, true) + + def dlog[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.log, true) + + def dabs[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.abs) + + def dsqrt[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.sqrt) + + def dsignum[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.signum) + } package object indexeddataset { http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/DoubleScalarOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/DoubleScalarOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/DoubleScalarOps.scala deleted file mode 100644 index 9fdd6e5..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/DoubleScalarOps.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.scalabindings - -import org.apache.mahout.math._ - -class DoubleScalarOps(val x:Double) extends AnyVal{ - - import RLikeOps._ - - def +(that:Matrix) = that + x - - def +(that:Vector) = that + x - - def *(that:Matrix) = that * x - - def *(that:Vector) = that * x - - def -(that:Matrix) = x -: that - - def -(that:Vector) = x -: that - - def /(that:Matrix) = x /: that - - def /(that:Vector) = x /: that - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala new file mode 100644 index 0000000..d0fd393 --- /dev/null +++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.scalabindings + +import org.apache.mahout.math._ +import org.apache.mahout.math.flavor.{BackEnum, TraversingStructureEnum} +import org.apache.mahout.math.function.Functions +import RLikeOps._ +import org.apache.mahout.logging._ + +import scala.collection.JavaConversions._ +import scala.collection._ + +object MMul extends MMBinaryFunc { + + private final implicit val log = getLog(MMul.getClass) + + override def apply(a: Matrix, b: Matrix, r: Option[Matrix]): Matrix = { + + require(a.ncol == b.nrow, "Incompatible matrix sizes in matrix multiplication.") + + val (af, bf) = (a.getFlavor, b.getFlavor) + val backs = (af.getBacking, bf.getBacking) + val sd = (af.getStructure, af.isDense, bf.getStructure, bf.isDense) + + val alg: MMulAlg = backs match { + + // Both operands are jvm memory backs. + case (BackEnum.JVMMEM, BackEnum.JVMMEM) â + + sd match { + + // Multiplication cases by a diagonal matrix. + case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.COLWISE, _) if (a + .isInstanceOf[DiagonalMatrix]) â jvmDiagCW _ + case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.SPARSECOLWISE, _) if (a + .isInstanceOf[DiagonalMatrix]) â jvmDiagCW _ + case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.ROWWISE, _) if (a + .isInstanceOf[DiagonalMatrix]) â jvmDiagRW _ + case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.SPARSEROWWISE, _) if (a + .isInstanceOf[DiagonalMatrix]) â jvmDiagRW _ + + case (TraversingStructureEnum.COLWISE, _, TraversingStructureEnum.VECTORBACKED, _) if (b + .isInstanceOf[DiagonalMatrix]) â jvmCWDiag _ + case (TraversingStructureEnum.SPARSECOLWISE, _, TraversingStructureEnum.VECTORBACKED, _) if (b + .isInstanceOf[DiagonalMatrix]) â jvmCWDiag _ + case (TraversingStructureEnum.ROWWISE, _, TraversingStructureEnum.VECTORBACKED, _) if (b + .isInstanceOf[DiagonalMatrix]) â jvmRWDiag _ + case (TraversingStructureEnum.SPARSEROWWISE, _, TraversingStructureEnum.VECTORBACKED, _) if (b + .isInstanceOf[DiagonalMatrix]) â jvmRWDiag _ + + // Dense-dense cases + case (TraversingStructureEnum.ROWWISE, true, TraversingStructureEnum.COLWISE, true) if (a eq b.t) â jvmDRWAAt _ + case (TraversingStructureEnum.ROWWISE, true, TraversingStructureEnum.COLWISE, true) if (a.t eq b) â jvmDRWAAt _ + case (TraversingStructureEnum.ROWWISE, true, TraversingStructureEnum.COLWISE, true) â jvmRWCW + case (TraversingStructureEnum.ROWWISE, true, TraversingStructureEnum.ROWWISE, true) â jvmRWRW + case (TraversingStructureEnum.COLWISE, true, TraversingStructureEnum.COLWISE, true) â jvmCWCW + case (TraversingStructureEnum.COLWISE, true, TraversingStructureEnum.ROWWISE, true) if ( a eq b.t) â jvmDCWAAt _ + case (TraversingStructureEnum.COLWISE, true, TraversingStructureEnum.ROWWISE, true) if ( a.t eq b) â jvmDCWAAt _ + case (TraversingStructureEnum.COLWISE, true, TraversingStructureEnum.ROWWISE, true) â jvmCWRW + + // Sparse row matrix x sparse row matrix (array of vectors) + case (TraversingStructureEnum.ROWWISE, false, TraversingStructureEnum.ROWWISE, false) â jvmSparseRWRW + case (TraversingStructureEnum.ROWWISE, false, TraversingStructureEnum.COLWISE, false) â jvmSparseRWCW + case (TraversingStructureEnum.COLWISE, false, TraversingStructureEnum.ROWWISE, false) â jvmSparseCWRW + case (TraversingStructureEnum.COLWISE, false, TraversingStructureEnum.COLWISE, false) â jvmSparseCWCW + + // Sparse matrix x sparse matrix (hashtable of vectors) + case (TraversingStructureEnum.SPARSEROWWISE, false, TraversingStructureEnum.SPARSEROWWISE, false) â + jvmSparseRowRWRW + case (TraversingStructureEnum.SPARSEROWWISE, false, TraversingStructureEnum.SPARSECOLWISE, false) â + jvmSparseRowRWCW + case (TraversingStructureEnum.SPARSECOLWISE, false, TraversingStructureEnum.SPARSEROWWISE, false) â + jvmSparseRowCWRW + case (TraversingStructureEnum.SPARSECOLWISE, false, TraversingStructureEnum.SPARSECOLWISE, false) â + jvmSparseRowCWCW + + // Sparse matrix x non-like + case (TraversingStructureEnum.SPARSEROWWISE, false, TraversingStructureEnum.ROWWISE, _) â jvmSparseRowRWRW + case (TraversingStructureEnum.SPARSEROWWISE, false, TraversingStructureEnum.COLWISE, _) â jvmSparseRowRWCW + case (TraversingStructureEnum.SPARSECOLWISE, false, TraversingStructureEnum.ROWWISE, _) â jvmSparseRowCWRW + case (TraversingStructureEnum.SPARSECOLWISE, false, TraversingStructureEnum.COLWISE, _) â jvmSparseCWCW + case (TraversingStructureEnum.ROWWISE, _, TraversingStructureEnum.SPARSEROWWISE, false) â jvmSparseRWRW + case (TraversingStructureEnum.ROWWISE, _, TraversingStructureEnum.SPARSECOLWISE, false) â jvmSparseRWCW + case (TraversingStructureEnum.COLWISE, _, TraversingStructureEnum.SPARSEROWWISE, false) â jvmSparseCWRW + case (TraversingStructureEnum.COLWISE, _, TraversingStructureEnum.SPARSECOLWISE, false) â jvmSparseRowCWCW + + // Everything else including at least one sparse LHS or RHS argument + case (TraversingStructureEnum.ROWWISE, false, TraversingStructureEnum.ROWWISE, _) â jvmSparseRWRW + case (TraversingStructureEnum.ROWWISE, false, TraversingStructureEnum.COLWISE, _) â jvmSparseRWCW + case (TraversingStructureEnum.COLWISE, false, TraversingStructureEnum.ROWWISE, _) â jvmSparseCWRW + case (TraversingStructureEnum.COLWISE, false, TraversingStructureEnum.COLWISE, _) â jvmSparseCWCW2flips + + // Sparse methods are only effective if the first argument is sparse, so we need to do a swap. + case (_, _, _, false) â { (a, b, r) â apply(b.t, a.t, r.map {_.t}).t } + + // Default jvm-jvm case. + case _ â jvmRWCW + } + } + + alg(a, b, r) + } + + type MMulAlg = MMBinaryFunc + + @inline + private def jvmRWCW(a: Matrix, b: Matrix, r: Option[Matrix] = None): Matrix = { + + require(r.forall(mxR â mxR.nrow == a.nrow && mxR.ncol == b.ncol)) + val (m, n) = (a.nrow, b.ncol) + + val mxR = r.getOrElse(if (a.getFlavor.isDense) a.like(m, n) else b.like(m, n)) + + for (row â 0 until mxR.nrow; col â 0 until mxR.ncol) { + // this vector-vector should be sort of optimized, right? + mxR(row, col) = a(row, ::) dot b(::, col) + } + mxR + } + + + @inline + private def jvmRWRW(a: Matrix, b: Matrix, r: Option[Matrix] = None): Matrix = { + + // A bit hackish: currently, this relies a bit on the fact that like produces RW(?) + val bclone = b.like(b.ncol, b.nrow).t + for (brow â b) bclone(brow.index(), ::) := brow + + require(bclone.getFlavor.getStructure == TraversingStructureEnum.COLWISE || bclone.getFlavor.getStructure == + TraversingStructureEnum.SPARSECOLWISE, "COL wise conversion assumption of RHS is wrong, do over this code.") + + jvmRWCW(a, bclone, r) + } + + private def jvmCWCW(a: Matrix, b: Matrix, r: Option[Matrix] = None): Matrix = { + jvmRWRW(b.t, a.t, r.map(_.t)).t + } + + private def jvmCWRW(a: Matrix, b: Matrix, r: Option[Matrix] = None): Matrix = { + // This is a primary contender with Outer Prod sum algo. + // Here, we force-reorient both matrices and run RWCW. + // A bit hackish: currently, this relies a bit on the fact that clone always produces RW(?) + val aclone = a.cloned + + require(aclone.getFlavor.getStructure == TraversingStructureEnum.ROWWISE || aclone.getFlavor.getStructure == + TraversingStructureEnum.SPARSEROWWISE, "Row wise conversion assumption of RHS is wrong, do over this code.") + + jvmRWRW(aclone, b, r) + } + + private def jvmSparseRWRW(a: Matrix, b: Matrix, r: Option[Matrix] = None): Matrix = { + val mxR = r.getOrElse(b.like(a.nrow, b.ncol)) + + // This is basically almost the algorithm from SparseMatrix.times + for (arow â a; ael â arow.nonZeroes) + mxR(arow.index(), ::).assign(b(ael.index, ::), Functions.plusMult(ael)) + + mxR + } + + private def jvmSparseRowRWRW(a: Matrix, b: Matrix, r: Option[Matrix] = None): Matrix = { + val mxR = r.getOrElse(b.like(a.nrow, b.ncol)) + for (arow â a.iterateNonEmpty(); ael â arow.vector.nonZeroes) + mxR(arow.index(), ::).assign(b(ael.index, ::), Functions.plusMult(ael)) + + mxR + } + + private def jvmSparseRowCWCW(a: Matrix, b: Matrix, r: Option[Matrix] = None) = + jvmSparseRowRWRW(b.t, a.t, r.map(_.t)).t + + private def jvmSparseRowCWCW2flips(a: Matrix, b: Matrix, r: Option[Matrix] = None) = + jvmSparseRowRWRW(a cloned, b cloned, r) + + private def jvmSparseRowRWCW(a: Matrix, b: Matrix, r: Option[Matrix]) = + jvmSparseRowRWRW(a, b cloned, r) + + + private def jvmSparseRowCWRW(a: Matrix, b: Matrix, r: Option[Matrix]) = + jvmSparseRowRWRW(a cloned, b, r) + + private def jvmSparseRWCW(a: Matrix, b: Matrix, r: Option[Matrix] = None) = + jvmSparseRWRW(a, b.cloned, r) + + private def jvmSparseCWRW(a: Matrix, b: Matrix, r: Option[Matrix] = None) = + jvmSparseRWRW(a cloned, b, r) + + private def jvmSparseCWCW(a: Matrix, b: Matrix, r: Option[Matrix] = None) = + jvmSparseRWRW(b.t, a.t, r.map(_.t)).t + + private def jvmSparseCWCW2flips(a: Matrix, b: Matrix, r: Option[Matrix] = None) = + jvmSparseRWRW(a cloned, b cloned, r) + + private def jvmDiagRW(diagm:Matrix, b:Matrix, r:Option[Matrix] = None):Matrix = { + val mxR = r.getOrElse(b.like(diagm.nrow, b.ncol)) + + for (del â diagm.diagv.nonZeroes()) + mxR(del.index, ::).assign(b(del.index, ::), Functions.plusMult(del)) + + mxR + } + + private def jvmDiagCW(diagm: Matrix, b: Matrix, r: Option[Matrix] = None): Matrix = { + val mxR = r.getOrElse(b.like(diagm.nrow, b.ncol)) + for (bcol â b.t) mxR(::, bcol.index()) := bcol * diagm.diagv + mxR + } + + private def jvmCWDiag(a: Matrix, diagm: Matrix, r: Option[Matrix] = None) = + jvmDiagRW(diagm, a.t, r.map {_.t}).t + + private def jvmRWDiag(a: Matrix, diagm: Matrix, r: Option[Matrix] = None) = + jvmDiagCW(diagm, a.t, r.map {_.t}).t + + + /** Dense column-wise AA' */ + private def jvmDCWAAt(a:Matrix, b:Matrix, r:Option[Matrix] = None) = { + // a.t must be equiv. to b. Cloning must rewrite to row-wise. + jvmDRWAAt(a.cloned,null,r) + } + + /** Dense Row-wise AA' */ + private def jvmDRWAAt(a:Matrix, b:Matrix, r:Option[Matrix] = None) = { + // a.t must be equiv to b. + + debug("AAt computation detected.") + + // Check dimensions if result is supplied. + require(r.forall(mxR â mxR.nrow == a.nrow && mxR.ncol == a.nrow)) + + val mxR = r.getOrElse(a.like(a.nrow, a.nrow)) + + // This is symmetric computation. Compile upper triangular first. + for (row â 0 until mxR.nrow) { + // diagonal value + mxR(row, row) = a(row, ::).aggregate(Functions.PLUS, Functions.SQUARE) + + for ( col â row + 1 until mxR.ncol) { + // this vector-vector should be sort of optimized, right? + val v = a(row, ::) dot a(col, ::) + + mxR(row, col) = v + mxR(col,row) = v + } + } + + mxR + } + + private def jvmOuterProdSum(a: Matrix, b: Matrix, r: Option[Matrix] = None): Matrix = { + + // This may be already laid out for outer product computation, which may be faster than reorienting + // both matrices? need to check. + val (m, n) = (a.nrow, b.ncol) + + // Prefer col-wise result iff a is dense and b is sparse. In all other cases default to row-wise. + val preferColWiseR = a.getFlavor.isDense && !b.getFlavor.isDense + + val mxR = r.getOrElse { + (a.getFlavor.isDense, preferColWiseR) match { + case (false, false) â b.like(m, n) + case (false, true) â b.like(n, m).t + case (true, false) â a.like(m, n) + case (true, true) â a.like(n, m).t + } + } + + // Loop outer products + if (preferColWiseR) { + // this means B is sparse and A is not, so we need to iterate over b values and update R columns with += + // one at a time. + for ((acol, brow) â a.t.zip(b); bel â brow.nonZeroes) mxR(::, bel.index()) += bel * acol + } else { + for ((acol, brow) â a.t.zip(b); ael â acol.nonZeroes()) mxR(ael.index(), ::) += ael * brow + } + + mxR + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala index 910035f..3c0ae89 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MatrixOps.scala @@ -17,8 +17,10 @@ package org.apache.mahout.math.scalabindings +import org.apache.mahout.math.flavor.TraversingStructureEnum import org.apache.mahout.math.{Matrices, QRDecomposition, Vector, Matrix} -import scala.collection.JavaConversions._ +import collection._ +import JavaConversions._ import org.apache.mahout.math.function.{DoubleDoubleFunction, VectorFunction, DoubleFunction, Functions} import scala.math._ @@ -41,6 +43,10 @@ class MatrixOps(val m: Matrix) { def +=(that: Matrix) = m.assign(that, Functions.PLUS) + def +=:(that:Matrix) = m += that + + def +=:(that:Double) = m += that + def -=(that: Matrix) = m.assign(that, Functions.MINUS) def +=(that: Double) = m.assign(new DoubleFunction { @@ -70,24 +76,30 @@ class MatrixOps(val m: Matrix) { def -:(that: Double) = that -=: cloned - - def norm = sqrt(m.aggregate(Functions.PLUS, Functions.SQUARE)) + def norm = math.sqrt(m.aggregate(Functions.PLUS, Functions.SQUARE)) def pnorm(p: Int) = pow(m.aggregate(Functions.PLUS, Functions.chain(Functions.ABS, Functions.pow(p))), 1.0 / p) def apply(row: Int, col: Int) = m.get(row, col) - def update(row: Int, col: Int, v: Double): Matrix = { - m.setQuick(row, col, v); + def update(row: Int, col: Int, that: Double): Matrix = { + m.setQuick(row, col, that); m } + def update(rowRange: Range, colRange: Range, that: Double) = apply(rowRange, colRange) := that + + def update(row: Int, colRange: Range, that: Double) = apply(row, colRange) := that + + def update(rowRange: Range, col: Int, that: Double) = apply(rowRange, col) := that + def update(rowRange: Range, colRange: Range, that: Matrix) = apply(rowRange, colRange) := that def update(row: Int, colRange: Range, that: Vector) = apply(row, colRange) := that def update(rowRange: Range, col: Int, that: Vector) = apply(rowRange, col) := that - + + def apply(rowRange: Range, colRange: Range): Matrix = { if (rowRange == :: && @@ -140,12 +152,60 @@ class MatrixOps(val m: Matrix) { }) } + def :=(that: Double) = m.assign(that) + def :=(f: (Int, Int, Double) => Double): Matrix = { - for (r <- 0 until nrow; c <- 0 until ncol) m(r, c) = f(r, c, m(r, c)) + import RLikeOps._ + m.getFlavor.getStructure match { + case TraversingStructureEnum.COLWISE | TraversingStructureEnum.SPARSECOLWISE => + for (col <- t; el <- col.all) el := f(el.index, col.index, el) + case default => + for (row <- m; el <- row.all) el := f(row.index, el.index, el) + } + m + } + + /** Functional assign with (Double) => Double */ + def :=(f: (Double) => Double): Matrix = { + import RLikeOps._ + m.getFlavor.getStructure match { + case TraversingStructureEnum.COLWISE | TraversingStructureEnum.SPARSECOLWISE => + for (col <- t; el <- col.all) el := f(el) + case default => + for (row <- m; el <- row.all) el := f(el) + } m } - def cloned: Matrix = m.like := m + /** Sparse assign: iterate and assign over non-zeros only */ + def ::=(f: (Int, Int, Double) => Double): Matrix = { + + import RLikeOps._ + + m.getFlavor.getStructure match { + case TraversingStructureEnum.COLWISE | TraversingStructureEnum.SPARSECOLWISE => + for (col <- t; el <- col.nonZeroes) el := f(el.index, col.index, el) + case default => + for (row <- m; el <- row.nonZeroes) el := f(row.index, el.index, el) + } + m + } + + /** Sparse function assign: iterate and assign over non-zeros only */ + def ::=(f: (Double) => Double): Matrix = { + + import RLikeOps._ + + m.getFlavor.getStructure match { + case TraversingStructureEnum.COLWISE | TraversingStructureEnum.SPARSECOLWISE => + for (col <- t; el <- col.nonZeroes) el := f(el) + case default => + for (row <- m; el <- row.nonZeroes) el := f(el) + } + m + } + + def cloned: Matrix = m.like := m /** * Ideally, we would probably want to override equals(). But that is not @@ -155,11 +215,14 @@ class MatrixOps(val m: Matrix) { * @return */ def equiv(that: Matrix) = + + // Warning: TODO: This would actually create empty objects in SparseMatrix. Should really implement + // merge-type comparison strategy using iterateNonEmpty. that != null && - nrow == that.nrow && - m.view.zip(that).forall(t => { - t._1.equiv(t._2) - }) + nrow == that.nrow && + m.view.zip(that).forall(t => { + t._1.equiv(t._2) + }) def nequiv(that: Matrix) = !equiv(that) http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeDoubleScalarOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeDoubleScalarOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeDoubleScalarOps.scala new file mode 100644 index 0000000..a1e9377 --- /dev/null +++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/RLikeDoubleScalarOps.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.scalabindings + +import org.apache.mahout.math._ + +class RLikeDoubleScalarOps(val x:Double) extends AnyVal{ + + import RLikeOps._ + + def +(that:Matrix) = that + x + + def +(that:Vector) = that + x + + def *(that:Matrix) = that * x + + def *(that:Vector) = that * x + + def -(that:Matrix) = x -: that + + def -(that:Vector) = x -: that + + def /(that:Matrix) = x /: that + + def /(that:Vector) = x /: that + + def cbind(that:Matrix) = { + val mx = that.like(that.nrow, that.ncol + 1) + mx(::, 1 until mx.ncol) := that + if (x != 0.0) mx(::, 0) := x + mx + } + + def rbind(that: Matrix) = { + val mx = that.like(that.nrow + 1, that.ncol) + mx(1 until mx.nrow, ::) := that + if (x != 0.0) mx(0, ::) := x + mx + } + + def c(that: Vector): Vector = { + val cv = that.like(that.length + 1) + cv(1 until cv.length) := that + cv(0) = x + cv + } + +}
