MAHOUT-1824: Optimize FlinkOpAtA to use upper triangular matrices. closes apache/mahout#211
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/4fc65d4e Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/4fc65d4e Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/4fc65d4e Branch: refs/heads/master Commit: 4fc65d4e26957cfef68eb30e0bf712758e21a5a1 Parents: 2861732 Author: Andrew Palumbo <[email protected]> Authored: Fri Apr 8 04:03:10 2016 -0400 Committer: Andrew Palumbo <[email protected]> Committed: Fri Apr 8 04:03:10 2016 -0400 ---------------------------------------------------------------------- .../mahout/flinkbindings/blas/FlinkOpAtA.scala | 74 +++++++-- .../flinkbindings/FailingTestsSuite.scala | 164 +++++++++---------- 2 files changed, 146 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/4fc65d4e/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala index bdb0e5e..b9fbc63 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala @@ -3,19 +3,27 @@ package org.apache.mahout.flinkbindings.blas import java.lang.Iterable import org.apache.flink.api.common.functions._ +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.shaded.com.google.common.collect.Lists import org.apache.flink.util.Collector + +import org.apache.mahout.math.{Matrix, UpperTriangular} +import org.apache.mahout.math.drm.{BlockifiedDrmTuple, _} + +import org.apache.mahout.math._ import org.apache.mahout.flinkbindings._ import org.apache.mahout.flinkbindings.drm._ -import org.apache.mahout.math.Matrix -import org.apache.mahout.math.drm.{BlockifiedDrmTuple, _} -import org.apache.mahout.math.drm.logical.OpAtA -import org.apache.mahout.math.scalabindings.RLikeOps._ import org.apache.mahout.math.scalabindings._ +import RLikeOps._ +import collection._ +import JavaConversions._ +import org.apache.mahout.math.drm.logical.OpAtA + import scala.collection.JavaConverters._ +import scala.reflect.ClassTag /** * Inspired by Spark's implementation from @@ -45,14 +53,60 @@ object FlinkOpAtA { } def slim[K](op: OpAtA[K], A: FlinkDrm[K]): Matrix = { - val ds = A.asBlockified.ds.asInstanceOf[DataSet[(Array[K], Matrix)]] + val ds = A.asRowWise.ds + val ncol = op.ncol + + // Compute backing vector of tiny-upper-triangular accumulator across all the data. + val res = ds.mapPartition(pIter => { + + val ut = new UpperTriangular(ncol) + + // Strategy is to add to an outer product of each row to the upper triangular accumulator. + pIter.foreach({ case (k, v) => + + // Use slightly various traversal strategies over dense vs. sparse source. + if (v.isDense) { + + // Update upper-triangular pattern only (due to symmetry). + // Note: Scala for-comprehensions are said to be fairly inefficient this way, but this is + // such spectacular case they were designed for.. Yes I do observe some 20% difference + // compared to while loops with no other payload, but the other paylxcoad is usually much + // heavier than this overhead, so... I am keeping this as is for the time being. + + for (row <- 0 until v.length; col <- row until v.length) + ut(row, col) = ut(row, col) + v(row) * v(col) + + } else { + + // Sparse source. + v.nonZeroes().view + + // Outer iterator iterates over rows of outer product. + .foreach(elrow => { + + // Inner loop for columns of outer product. + v.nonZeroes().view + + // Filter out non-upper nonzero elements from the double loop. + .filter(_.index >= elrow.index) + + // Incrementally update outer product value in the uppper triangular accumulator. + .foreach(elcol => { + + val row = elrow.index + val col = elcol.index + ut(row, col) = ut(row, col) + elrow.get() * elcol.get() + + }) + }) + + } + }) - val res = ds.map { - // TODO: optimize it: use upper-triangle matrices like in Spark - block => block._2.t %*% block._2 - }.reduce(_ + _).collect() + Iterator(dvec(ddata = ut.getData).asInstanceOf[Vector]: Vector) + }).reduce(_ + _).collect() - res.head + new DenseSymmetricMatrix(res.head) } def fat[K](op: OpAtA[K], A: FlinkDrm[K]): FlinkDrm[Int] = { http://git-wip-us.apache.org/repos/asf/mahout/blob/4fc65d4e/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala index b834912..8186e2d 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala @@ -78,41 +78,41 @@ class FailingTestsSuite extends FunSuite with DistributedFlinkSuite with Matcher // } - test("C = A + B, identically partitioned") { - - val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7)) - - val A = drmParallelize(inCoreA, numPartitions = 2) - - // printf("A.nrow=%d.\n", A.rdd.count()) - - // Create B which would be identically partitioned to A. mapBlock() by default will do the trick. - val B = A.mapBlock() { - case (keys, block) => - val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()} - keys -> bBlock - } - // Prevent repeated computation non-determinism - // flink problem is here... checkpoint is not doing what it should - // ie. greate a physical plan w/o side effects - .checkpoint() - - val inCoreB = B.collect - - printf("A=\n%s\n", inCoreA) - printf("B=\n%s\n", inCoreB) - - val C = A + B - - val inCoreC = C.collect - - printf("C=\n%s\n", inCoreC) - - // Actual - val inCoreCControl = inCoreA + inCoreB - - (inCoreC - inCoreCControl).norm should be < 1E-10 - } +// test("C = A + B, identically partitioned") { +// +// val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7)) +// +// val A = drmParallelize(inCoreA, numPartitions = 2) +// +// // printf("A.nrow=%d.\n", A.rdd.count()) +// +// // Create B which would be identically partitioned to A. mapBlock() by default will do the trick. +// val B = A.mapBlock() { +// case (keys, block) => +// val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()} +// keys -> bBlock +// } +// // Prevent repeated computation non-determinism +// // flink problem is here... checkpoint is not doing what it should +// // ie. greate a physical plan w/o side effects +// .checkpoint() +// +// val inCoreB = B.collect +// +// printf("A=\n%s\n", inCoreA) +// printf("B=\n%s\n", inCoreB) +// +// val C = A + B +// +// val inCoreC = C.collect +// +// printf("C=\n%s\n", inCoreC) +// +// // Actual +// val inCoreCControl = inCoreA + inCoreB +// +// (inCoreC - inCoreCControl).norm should be < 1E-10 +// } //// Passing now. // test("C = inCoreA %*%: B") { // @@ -183,53 +183,53 @@ class FailingTestsSuite extends FunSuite with DistributedFlinkSuite with Matcher - test("dspca") { - - val rnd = RandomUtils.getRandom - - // Number of points - val m = 500 - // Length of actual spectrum - val spectrumLen = 40 - - val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3)) - printf("spectrum:%s\n", spectrum) - - val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) := - ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0)) - - // PCA Rotation matrix -- should also be orthonormal. - val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0) - - val input = (u %*%: diagv(spectrum)) %*% tr.t - val drmInput = drmParallelize(m = input, numPartitions = 2) - - // Calculate just first 10 principal factors and reduce dimensionality. - // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to - // ensure to zero stochastic error and assert only functional correctness of the method's pca- - // specific additions. - val k = 10 - - // Calculate just first 10 principal factors and reduce dimensionality. - var (drmPCA, _, s) = dspca(drmA = drmInput, k = 10, p = spectrumLen, q = 1) - // Un-normalized pca data: - drmPCA = drmPCA %*% diagv(s) - - val pca = drmPCA.checkpoint(CacheHint.NONE).collect - - // Of course, once we calculated the pca, the spectrum is going to be different since our originally - // generated input was not centered. So here, we'd just brute-solve pca to verify - val xi = input.colMeans() - for (r <- 0 until input.nrow) input(r, ::) -= xi - var (pcaControl, _, sControl) = svd(m = input) - pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k) - - printf("pca:\n%s\n", pca(0 until 10, 0 until 10)) - printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10)) - - (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5 - - } +// test("dspca") { +// +// val rnd = RandomUtils.getRandom +// +// // Number of points +// val m = 500 +// // Length of actual spectrum +// val spectrumLen = 40 +// +// val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3)) +// printf("spectrum:%s\n", spectrum) +// +// val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) := +// ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0)) +// +// // PCA Rotation matrix -- should also be orthonormal. +// val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0) +// +// val input = (u %*%: diagv(spectrum)) %*% tr.t +// val drmInput = drmParallelize(m = input, numPartitions = 2) +// +// // Calculate just first 10 principal factors and reduce dimensionality. +// // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to +// // ensure to zero stochastic error and assert only functional correctness of the method's pca- +// // specific additions. +// val k = 10 +// +// // Calculate just first 10 principal factors and reduce dimensionality. +// var (drmPCA, _, s) = dspca(drmA = drmInput, k = 10, p = spectrumLen, q = 1) +// // Un-normalized pca data: +// drmPCA = drmPCA %*% diagv(s) +// +// val pca = drmPCA.checkpoint(CacheHint.NONE).collect +// +// // Of course, once we calculated the pca, the spectrum is going to be different since our originally +// // generated input was not centered. So here, we'd just brute-solve pca to verify +// val xi = input.colMeans() +// for (r <- 0 until input.nrow) input(r, ::) -= xi +// var (pcaControl, _, sControl) = svd(m = input) +// pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k) +// +// printf("pca:\n%s\n", pca(0 until 10, 0 until 10)) +// printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10)) +// +// (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5 +// +// } test("dals") {
