Repository: spark
Updated Branches:
  refs/heads/master 6af7b43b3 -> 25b4f41d2


[SPARK-20677][MLLIB][ML] Follow-up to ALS recommend-all performance PRs

Small clean ups from #17742 and #17845.

## How was this patch tested?

Existing unit tests.

Author: Nick Pentreath <ni...@za.ibm.com>

Closes #17919 from MLnick/SPARK-20677-als-perf-followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/25b4f41d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/25b4f41d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/25b4f41d

Branch: refs/heads/master
Commit: 25b4f41d239ac67402566c0254a893e2e58ae7d8
Parents: 6af7b43
Author: Nick Pentreath <ni...@za.ibm.com>
Authored: Tue May 16 10:59:34 2017 +0200
Committer: Nick Pentreath <ni...@za.ibm.com>
Committed: Tue May 16 10:59:34 2017 +0200

----------------------------------------------------------------------
 .../scala/org/apache/spark/ml/linalg/BLAS.scala |  2 +-
 .../apache/spark/ml/recommendation/ALS.scala    | 26 +++-------
 .../org/apache/spark/mllib/linalg/BLAS.scala    |  2 +-
 .../MatrixFactorizationModel.scala              | 51 ++++++++------------
 4 files changed, 28 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/25b4f41d/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
----------------------------------------------------------------------
diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala 
b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
index ef38909..2a0f8c1 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
@@ -29,7 +29,7 @@ private[spark] object BLAS extends Serializable {
   @transient private var _nativeBLAS: NetlibBLAS = _
 
   // For level-1 routines, we use Java implementation.
-  private def f2jBLAS: NetlibBLAS = {
+  private[ml] def f2jBLAS: NetlibBLAS = {
     if (_f2jBLAS == null) {
       _f2jBLAS = new F2jBLAS
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/25b4f41d/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index d626f04..0955d3e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -35,6 +35,7 @@ import org.apache.spark.{Dependency, Partitioner, 
ShuffleDependency, SparkContex
 import org.apache.spark.annotation.{DeveloperApi, Since}
 import org.apache.spark.internal.Logging
 import org.apache.spark.ml.{Estimator, Model}
+import org.apache.spark.ml.linalg.BLAS
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
@@ -363,7 +364,7 @@ class ALSModel private[ml] (
    * relatively efficient, the approach implemented here is significantly more 
efficient.
    *
    * This approach groups factors into blocks and computes the top-k elements 
per block,
-   * using a simple dot product (instead of gemm) and an efficient 
[[BoundedPriorityQueue]].
+   * using dot product and an efficient [[BoundedPriorityQueue]] (instead of 
gemm).
    * It then computes the global top-k by aggregating the per block top-k 
elements with
    * a [[TopByKeyAggregator]]. This significantly reduces the size of 
intermediate and shuffle data.
    * This is the DataFrame equivalent to the approach used in
@@ -393,31 +394,18 @@ class ALSModel private[ml] (
         val m = srcIter.size
         val n = math.min(dstIter.size, num)
         val output = new Array[(Int, Int, Float)](m * n)
-        var j = 0
+        var i = 0
         val pq = new BoundedPriorityQueue[(Int, Float)](num)(Ordering.by(_._2))
         srcIter.foreach { case (srcId, srcFactor) =>
           dstIter.foreach { case (dstId, dstFactor) =>
-            /*
-             * The below code is equivalent to
-             *    `val score = blas.sdot(rank, srcFactor, 1, dstFactor, 1)`
-             * This handwritten version is as or more efficient as BLAS calls 
in this case.
-             */
-            var score = 0.0f
-            var k = 0
-            while (k < rank) {
-              score += srcFactor(k) * dstFactor(k)
-              k += 1
-            }
+            // We use F2jBLAS which is faster than a call to native BLAS for 
vector dot product
+            val score = BLAS.f2jBLAS.sdot(rank, srcFactor, 1, dstFactor, 1)
             pq += dstId -> score
           }
-          val pqIter = pq.iterator
-          var i = 0
-          while (i < n) {
-            val (dstId, score) = pqIter.next()
-            output(j + i) = (srcId, dstId, score)
+          pq.foreach { case (dstId, score) =>
+            output(i) = (srcId, dstId, score)
             i += 1
           }
-          j += n
           pq.clear()
         }
         output.toSeq

http://git-wip-us.apache.org/repos/asf/spark/blob/25b4f41d/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala
index 0cd68a6..cb97742 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala
@@ -31,7 +31,7 @@ private[spark] object BLAS extends Serializable with Logging {
   @transient private var _nativeBLAS: NetlibBLAS = _
 
   // For level-1 routines, we use Java implementation.
-  private def f2jBLAS: NetlibBLAS = {
+  private[mllib] def f2jBLAS: NetlibBLAS = {
     if (_f2jBLAS == null) {
       _f2jBLAS = new F2jBLAS
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/25b4f41d/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index d45866c..ac709ad 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -20,8 +20,6 @@ package org.apache.spark.mllib.recommendation
 import java.io.IOException
 import java.lang.{Integer => JavaInteger}
 
-import scala.collection.mutable
-
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
 import com.github.fommil.netlib.BLAS.{getInstance => blas}
 import org.apache.hadoop.fs.Path
@@ -33,7 +31,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.annotation.Since
 import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
 import org.apache.spark.internal.Logging
-import org.apache.spark.mllib.linalg._
+import org.apache.spark.mllib.linalg.BLAS
 import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
@@ -263,6 +261,19 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
 
   /**
    * Makes recommendations for all users (or products).
+   *
+   * Note: the previous approach used for computing top-k recommendations 
aimed to group
+   * individual factor vectors into blocks, so that Level 3 BLAS operations 
(gemm) could
+   * be used for efficiency. However, this causes excessive GC pressure due to 
the large
+   * arrays required for intermediate result storage, as well as a high 
sensitivity to the
+   * block size used.
+   *
+   * The following approach still groups factors into blocks, but instead 
computes the
+   * top-k elements per block, using dot product and an efficient 
[[BoundedPriorityQueue]]
+   * (instead of gemm). This avoids any large intermediate data structures and 
results
+   * in significantly reduced GC pressure as well as shuffle data, which far 
outweighs
+   * any cost incurred from not using Level 3 BLAS operations.
+   *
    * @param rank rank
    * @param srcFeatures src features to receive recommendations
    * @param dstFeatures dst features used to make recommendations
@@ -277,46 +288,22 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
       num: Int): RDD[(Int, Array[(Int, Double)])] = {
     val srcBlocks = blockify(srcFeatures)
     val dstBlocks = blockify(dstFeatures)
-    /**
-     * The previous approach used for computing top-k recommendations aimed to 
group
-     * individual factor vectors into blocks, so that Level 3 BLAS operations 
(gemm) could
-     * be used for efficiency. However, this causes excessive GC pressure due 
to the large
-     * arrays required for intermediate result storage, as well as a high 
sensitivity to the
-     * block size used.
-     * The following approach still groups factors into blocks, but instead 
computes the
-     * top-k elements per block, using a simple dot product (instead of gemm) 
and an efficient
-     * [[BoundedPriorityQueue]]. This avoids any large intermediate data 
structures and results
-     * in significantly reduced GC pressure as well as shuffle data, which far 
outweighs
-     * any cost incurred from not using Level 3 BLAS operations.
-     */
     val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
       val m = srcIter.size
       val n = math.min(dstIter.size, num)
       val output = new Array[(Int, (Int, Double))](m * n)
-      var j = 0
+      var i = 0
       val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2))
       srcIter.foreach { case (srcId, srcFactor) =>
         dstIter.foreach { case (dstId, dstFactor) =>
-          /*
-           * The below code is equivalent to
-           *    `val score = blas.ddot(rank, srcFactor, 1, dstFactor, 1)`
-           * This handwritten version is as or more efficient as BLAS calls in 
this case.
-           */
-          var score: Double = 0
-          var k = 0
-          while (k < rank) {
-            score += srcFactor(k) * dstFactor(k)
-            k += 1
-          }
+          // We use F2jBLAS which is faster than a call to native BLAS for 
vector dot product
+          val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
           pq += dstId -> score
         }
-        val pqIter = pq.iterator
-        var i = 0
-        while (i < n) {
-          output(j + i) = (srcId, pqIter.next())
+        pq.foreach { case (dstId, score) =>
+          output(i) = (srcId, (dstId, score))
           i += 1
         }
-        j += n
         pq.clear()
       }
       output.toSeq


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to