This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 7227a94  [SPARK-30773][ML] Support NativeBlas for level-1 routines
7227a94 is described below

commit 7227a9404dce18e48368114c66c2e1bc4e4abfc1
Author: yan ma <[email protected]>
AuthorDate: Fri Mar 20 10:32:58 2020 -0500

    [SPARK-30773][ML] Support NativeBlas for level-1 routines
    
    ### What changes were proposed in this pull request?
    Change BLAS for part of level-1 routines(axpy, dot, scal(double, 
denseVector)) from java implementation to NativeBLAS when vector size>256
    
    ### Why are the changes needed?
    In current ML BLAS.scala, all level-1 routines are fixed to use java
    implementation. But NativeBLAS(intel MKL, OpenBLAS) can bring up to 11X
    performance improvement based on performance test which apply direct
    calls against these methods. We should provide a way to allow user take
    advantage of NativeBLAS for level-1 routines. Here we do it through
    switching to NativeBLAS for these methods from f2jBLAS.
    
    ### Does this PR introduce any user-facing change?
     Yes, methods axpy, dot, scal in level-1 routines will switch to NativeBLAS 
when it has more than nativeL1Threshold(fixed value 256) elements and will 
fallback to f2jBLAS if native BLAS is not properly configured in system.
    
    ### How was this patch tested?
    Perf test direct calls level-1 routines
    
    Closes #27546 from yma11/SPARK-30773.
    
    Lead-authored-by: yan ma <[email protected]>
    Co-authored-by: Ma Yan <[email protected]>
    Signed-off-by: Sean Owen <[email protected]>
    (cherry picked from commit fae981e5f32e0ddb86591a616829423dfafb4ed0)
    Signed-off-by: Sean Owen <[email protected]>
---
 docs/ml-guide.md                                   |  2 +-
 .../scala/org/apache/spark/ml/linalg/BLAS.scala    | 27 ++++++++++++++--------
 .../org/apache/spark/ml/linalg/BLASSuite.scala     |  6 +++++
 .../scala/org/apache/spark/mllib/linalg/BLAS.scala | 27 ++++++++++++++--------
 .../org/apache/spark/mllib/linalg/BLASSuite.scala  |  6 +++++
 5 files changed, 49 insertions(+), 19 deletions(-)

diff --git a/docs/ml-guide.md b/docs/ml-guide.md
index 2037285..5ce6b4f 100644
--- a/docs/ml-guide.md
+++ b/docs/ml-guide.md
@@ -78,7 +78,7 @@ The most popular native BLAS such as [Intel 
MKL](https://software.intel.com/en-u
 
 Configuring these BLAS implementations to use a single thread for operations 
may actually improve performance (see 
[SPARK-21305](https://issues.apache.org/jira/browse/SPARK-21305)). It is 
usually optimal to match this to the number of cores each Spark task is 
configured to use, which is 1 by default and typically left at 1.
 
-Please refer to resources like the following to understand how to configure 
the number of threads these BLAS implementations use: [Intel 
MKL](https://software.intel.com/en-us/articles/recommended-settings-for-calling-intel-mkl-routines-from-multi-threaded-applications)
 and [OpenBLAS](https://github.com/xianyi/OpenBLAS/wiki/faq#multi-threaded).
+Please refer to resources like the following to understand how to configure 
the number of threads these BLAS implementations use: [Intel 
MKL](https://software.intel.com/en-us/articles/recommended-settings-for-calling-intel-mkl-routines-from-multi-threaded-applications)
 or [Intel 
oneMKL](https://software.intel.com/en-us/onemkl-linux-developer-guide-improving-performance-with-threading)
 and [OpenBLAS](https://github.com/xianyi/OpenBLAS/wiki/faq#multi-threaded). 
Note that if nativeBLAS is n [...]
 
 To use MLlib in Python, you will need [NumPy](http://www.numpy.org) version 
1.4 or newer.
 
diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala 
b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
index e054a15..fcd33a6 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
@@ -27,8 +27,9 @@ private[spark] object BLAS extends Serializable {
 
   @transient private var _f2jBLAS: NetlibBLAS = _
   @transient private var _nativeBLAS: NetlibBLAS = _
+  private val nativeL1Threshold: Int = 256
 
-  // For level-1 routines, we use Java implementation.
+  // For level-1 function dspmv, use f2jBLAS for better performance.
   private[ml] def f2jBLAS: NetlibBLAS = {
     if (_f2jBLAS == null) {
       _f2jBLAS = new F2jBLAS
@@ -36,6 +37,14 @@ private[spark] object BLAS extends Serializable {
     _f2jBLAS
   }
 
+  private[ml] def getBLAS(vectorSize: Int): NetlibBLAS = {
+    if (vectorSize < nativeL1Threshold) {
+      f2jBLAS
+    } else {
+      nativeBLAS
+    }
+  }
+
   /**
    * y += a * x
    */
@@ -63,7 +72,7 @@ private[spark] object BLAS extends Serializable {
    */
   private def axpy(a: Double, x: DenseVector, y: DenseVector): Unit = {
     val n = x.size
-    f2jBLAS.daxpy(n, a, x.values, 1, y.values, 1)
+    getBLAS(n).daxpy(n, a, x.values, 1, y.values, 1)
   }
 
   /**
@@ -94,7 +103,7 @@ private[spark] object BLAS extends Serializable {
   private[spark] def axpy(a: Double, X: DenseMatrix, Y: DenseMatrix): Unit = {
     require(X.numRows == Y.numRows && X.numCols == Y.numCols, "Dimension 
mismatch: " +
       s"size(X) = ${(X.numRows, X.numCols)} but size(Y) = ${(Y.numRows, 
Y.numCols)}.")
-    f2jBLAS.daxpy(X.numRows * X.numCols, a, X.values, 1, Y.values, 1)
+    getBLAS(X.values.length).daxpy(X.numRows * X.numCols, a, X.values, 1, 
Y.values, 1)
   }
 
   /**
@@ -123,7 +132,7 @@ private[spark] object BLAS extends Serializable {
    */
   private def dot(x: DenseVector, y: DenseVector): Double = {
     val n = x.size
-    f2jBLAS.ddot(n, x.values, 1, y.values, 1)
+    getBLAS(n).ddot(n, x.values, 1, y.values, 1)
   }
 
   /**
@@ -218,16 +227,16 @@ private[spark] object BLAS extends Serializable {
   def scal(a: Double, x: Vector): Unit = {
     x match {
       case sx: SparseVector =>
-        f2jBLAS.dscal(sx.values.length, a, sx.values, 1)
+        getBLAS(sx.values.length).dscal(sx.values.length, a, sx.values, 1)
       case dx: DenseVector =>
-        f2jBLAS.dscal(dx.values.length, a, dx.values, 1)
+        getBLAS(dx.size).dscal(dx.values.length, a, dx.values, 1)
       case _ =>
         throw new IllegalArgumentException(s"scal doesn't support vector type 
${x.getClass}.")
     }
   }
 
   // For level-3 routines, we use the native BLAS.
-  private def nativeBLAS: NetlibBLAS = {
+  private[ml] def nativeBLAS: NetlibBLAS = {
     if (_nativeBLAS == null) {
       _nativeBLAS = NativeBLAS
     }
@@ -374,7 +383,7 @@ private[spark] object BLAS extends Serializable {
       // gemm: alpha is equal to 0 and beta is equal to 1. Returning C.
       return
     } else if (alpha == 0.0) {
-      f2jBLAS.dscal(C.values.length, beta, C.values, 1)
+      getBLAS(C.values.length).dscal(C.values.length, beta, C.values, 1)
     } else {
       A match {
         case sparse: SparseMatrix => gemm(alpha, sparse, B, beta, C)
@@ -480,7 +489,7 @@ private[spark] object BLAS extends Serializable {
     } else {
       // Scale matrix first if `beta` is not equal to 1.0
       if (beta != 1.0) {
-        f2jBLAS.dscal(C.values.length, beta, C.values, 1)
+        getBLAS(C.values.length).dscal(C.values.length, beta, C.values, 1)
       }
       // Perform matrix multiplication and add to C. The rows of A are 
multiplied by the columns of
       // B, and added to C.
diff --git 
a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala 
b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala
index 877ac68..781f3da 100644
--- a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala
+++ b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala
@@ -23,6 +23,12 @@ import org.apache.spark.ml.util.TestingUtils._
 
 class BLASSuite extends SparkMLFunSuite {
 
+  test("nativeL1Threshold") {
+    assert(getBLAS(128) == BLAS.f2jBLAS)
+    assert(getBLAS(256) == BLAS.nativeBLAS)
+    assert(getBLAS(512) == BLAS.nativeBLAS)
+  }
+
   test("copy") {
     val sx = Vectors.sparse(4, Array(0, 2), Array(1.0, -2.0))
     val dx = Vectors.dense(1.0, 0.0, -2.0, 0.0)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala
index 1f5558d..3371b05 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala
@@ -29,8 +29,9 @@ private[spark] object BLAS extends Serializable with Logging {
 
   @transient private var _f2jBLAS: NetlibBLAS = _
   @transient private var _nativeBLAS: NetlibBLAS = _
+  private val nativeL1Threshold: Int = 256
 
-  // For level-1 routines, we use Java implementation.
+  // For level-1 function dspmv, use f2jBLAS for better performance.
   private[mllib] def f2jBLAS: NetlibBLAS = {
     if (_f2jBLAS == null) {
       _f2jBLAS = new F2jBLAS
@@ -38,6 +39,14 @@ private[spark] object BLAS extends Serializable with Logging 
{
     _f2jBLAS
   }
 
+  private[mllib] def getBLAS(vectorSize: Int): NetlibBLAS = {
+    if (vectorSize < nativeL1Threshold) {
+      f2jBLAS
+    } else {
+      nativeBLAS
+    }
+  }
+
   /**
    * y += a * x
    */
@@ -65,7 +74,7 @@ private[spark] object BLAS extends Serializable with Logging {
    */
   private def axpy(a: Double, x: DenseVector, y: DenseVector): Unit = {
     val n = x.size
-    f2jBLAS.daxpy(n, a, x.values, 1, y.values, 1)
+    getBLAS(n).daxpy(n, a, x.values, 1, y.values, 1)
   }
 
   /**
@@ -96,7 +105,7 @@ private[spark] object BLAS extends Serializable with Logging 
{
   private[spark] def axpy(a: Double, X: DenseMatrix, Y: DenseMatrix): Unit = {
     require(X.numRows == Y.numRows && X.numCols == Y.numCols, "Dimension 
mismatch: " +
       s"size(X) = ${(X.numRows, X.numCols)} but size(Y) = ${(Y.numRows, 
Y.numCols)}.")
-    f2jBLAS.daxpy(X.numRows * X.numCols, a, X.values, 1, Y.values, 1)
+    getBLAS(X.values.length).daxpy(X.numRows * X.numCols, a, X.values, 1, 
Y.values, 1)
   }
 
   /**
@@ -125,7 +134,7 @@ private[spark] object BLAS extends Serializable with 
Logging {
    */
   private def dot(x: DenseVector, y: DenseVector): Double = {
     val n = x.size
-    f2jBLAS.ddot(n, x.values, 1, y.values, 1)
+    getBLAS(n).ddot(n, x.values, 1, y.values, 1)
   }
 
   /**
@@ -220,16 +229,16 @@ private[spark] object BLAS extends Serializable with 
Logging {
   def scal(a: Double, x: Vector): Unit = {
     x match {
       case sx: SparseVector =>
-        f2jBLAS.dscal(sx.values.length, a, sx.values, 1)
+        getBLAS(sx.values.length).dscal(sx.values.length, a, sx.values, 1)
       case dx: DenseVector =>
-        f2jBLAS.dscal(dx.values.length, a, dx.values, 1)
+        getBLAS(dx.size).dscal(dx.values.length, a, dx.values, 1)
       case _ =>
         throw new IllegalArgumentException(s"scal doesn't support vector type 
${x.getClass}.")
     }
   }
 
   // For level-3 routines, we use the native BLAS.
-  private def nativeBLAS: NetlibBLAS = {
+  private[mllib] def nativeBLAS: NetlibBLAS = {
     if (_nativeBLAS == null) {
       _nativeBLAS = NativeBLAS
     }
@@ -356,7 +365,7 @@ private[spark] object BLAS extends Serializable with 
Logging {
     if (alpha == 0.0 && beta == 1.0) {
       logDebug("gemm: alpha is equal to 0 and beta is equal to 1. Returning 
C.")
     } else if (alpha == 0.0) {
-      f2jBLAS.dscal(C.values.length, beta, C.values, 1)
+      getBLAS(C.values.length).dscal(C.values.length, beta, C.values, 1)
     } else {
       A match {
         case sparse: SparseMatrix => gemm(alpha, sparse, B, beta, C)
@@ -462,7 +471,7 @@ private[spark] object BLAS extends Serializable with 
Logging {
     } else {
       // Scale matrix first if `beta` is not equal to 1.0
       if (beta != 1.0) {
-        f2jBLAS.dscal(C.values.length, beta, C.values, 1)
+        getBLAS(C.values.length).dscal(C.values.length, beta, C.values, 1)
       }
       // Perform matrix multiplication and add to C. The rows of A are 
multiplied by the columns of
       // B, and added to C.
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala
index 6e68c1c..12ab2ac 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala
@@ -23,6 +23,12 @@ import org.apache.spark.mllib.util.TestingUtils._
 
 class BLASSuite extends SparkFunSuite {
 
+  test("nativeL1Threshold") {
+    assert(getBLAS(128) == BLAS.f2jBLAS)
+    assert(getBLAS(256) == BLAS.nativeBLAS)
+    assert(getBLAS(512) == BLAS.nativeBLAS)
+  }
+
   test("copy") {
     val sx = Vectors.sparse(4, Array(0, 2), Array(1.0, -2.0))
     val dx = Vectors.dense(1.0, 0.0, -2.0, 0.0)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to