This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2802ac3  [SPARK-35666][ML] gemv skip array shape checking
2802ac3 is described below

commit 2802ac321f7378c8a9113338c9872b8fd332de6b
Author: Ruifeng Zheng <ruife...@foxmail.com>
AuthorDate: Wed Jun 16 08:54:34 2021 +0800

    [SPARK-35666][ML] gemv skip array shape checking
    
    ### What changes were proposed in this pull request?
    In existing impls, it is common case that the vector/matrix need to be 
sliced/copied just due to shape match.
    which makes the logic complex and introduce extra costing of slicing & 
copying.
    
    ### Why are the changes needed?
    1, avoid slicing and copying due to shape checking;
    2, simpify the usages;
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    existing testsuites
    
    Closes #32805 from zhengruifeng/new_blas_func_for_agg.
    
    Authored-by: Ruifeng Zheng <ruife...@foxmail.com>
    Signed-off-by: Ruifeng Zheng <ruife...@foxmail.com>
---
 .../scala/org/apache/spark/ml/linalg/BLAS.scala    | 60 ++++++++++++++++++++--
 .../ml/optim/aggregator/AFTBlockAggregator.scala   | 24 +++------
 .../aggregator/BinaryLogisticBlockAggregator.scala | 34 ++----------
 .../ml/optim/aggregator/HingeBlockAggregator.scala | 34 ++----------
 .../ml/optim/aggregator/HuberBlockAggregator.scala | 24 +++------
 .../aggregator/LeastSquaresBlockAggregator.scala   | 18 +++----
 6 files changed, 84 insertions(+), 110 deletions(-)

diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala 
b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
index 5a6bee3..0bc8b2f 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
@@ -536,6 +536,32 @@ private[spark] object BLAS extends Serializable {
   }
 
   /**
+   * y[0: A.numRows] := alpha * A * x[0: A.numCols] + beta * y[0: A.numRows]
+   */
+  def gemv(
+      alpha: Double,
+      A: Matrix,
+      x: Array[Double],
+      beta: Double,
+      y: Array[Double]): Unit = {
+    require(A.numCols <= x.length,
+      s"The columns of A don't match the number of elements of x. A: 
${A.numCols}, x: ${x.length}")
+    require(A.numRows <= y.length,
+      s"The rows of A don't match the number of elements of y. A: 
${A.numRows}, y:${y.length}")
+    if (alpha == 0.0 && beta == 1.0) {
+      // gemv: alpha is equal to 0 and beta is equal to 1. Returning y.
+      return
+    } else if (alpha == 0.0) {
+      getBLAS(A.numRows).dscal(A.numRows, beta, y, 1)
+    } else {
+      A match {
+        case smA: SparseMatrix => gemvImpl(alpha, smA, x, beta, y)
+        case dmA: DenseMatrix => gemvImpl(alpha, dmA, x, beta, y)
+      }
+    }
+  }
+
+  /**
    * y := alpha * A * x + beta * y
    * @param alpha a scalar to scale the multiplication A * x.
    * @param A the matrix A that will be left multiplied to x. Size of m x n.
@@ -585,11 +611,24 @@ private[spark] object BLAS extends Serializable {
       x: DenseVector,
       beta: Double,
       y: DenseVector): Unit = {
+    gemvImpl(alpha, A, x.values, beta, y.values)
+  }
+
+  /**
+   * y[0: A.numRows] := alpha * A * x[0: A.numCols] + beta * y[0: A.numRows]
+   * For `DenseMatrix` A.
+   */
+  private def gemvImpl(
+      alpha: Double,
+      A: DenseMatrix,
+      xValues: Array[Double],
+      beta: Double,
+      yValues: Array[Double]): Unit = {
     val tStrA = if (A.isTransposed) "T" else "N"
     val mA = if (!A.isTransposed) A.numRows else A.numCols
     val nA = if (!A.isTransposed) A.numCols else A.numRows
-    nativeBLAS.dgemv(tStrA, mA, nA, alpha, A.values, mA, x.values, 1, beta,
-      y.values, 1)
+    nativeBLAS.dgemv(tStrA, mA, nA, alpha, A.values, mA, xValues, 1, beta,
+      yValues, 1)
   }
 
   /**
@@ -715,8 +754,19 @@ private[spark] object BLAS extends Serializable {
       x: DenseVector,
       beta: Double,
       y: DenseVector): Unit = {
-    val xValues = x.values
-    val yValues = y.values
+    gemvImpl(alpha, A, x.values, beta, y.values)
+  }
+
+  /**
+   * y[0: A.numRows] := alpha * A * x[0: A.numCols] + beta * y[0: A.numRows]
+   * For `SparseMatrix` A.
+   */
+  private def gemvImpl(
+      alpha: Double,
+      A: SparseMatrix,
+      xValues: Array[Double],
+      beta: Double,
+      yValues: Array[Double]): Unit = {
     val mA: Int = A.numRows
     val nA: Int = A.numCols
 
@@ -738,7 +788,7 @@ private[spark] object BLAS extends Serializable {
         rowCounter += 1
       }
     } else {
-      if (beta != 1.0) scal(beta, y)
+      if (beta != 1.0) getBLAS(mA).dscal(mA, beta, yValues, 1)
       // Perform matrix-vector multiplication and add to y
       var colCounterForA = 0
       while (colCounterForA < nA) {
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTBlockAggregator.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTBlockAggregator.scala
index 23ceea2..355f00c 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTBlockAggregator.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTBlockAggregator.scala
@@ -55,8 +55,6 @@ private[ml] class AFTBlockAggregator (
       s" but got type ${bcCoefficients.value.getClass}.")
   }
 
-  @transient private lazy val linear = 
Vectors.dense(coefficientsArray.take(numFeatures))
-
   // pre-computed margin of an empty vector.
   // with this variable as an offset, for a sparse vector, we only need to
   // deal with non-zero values in prediction.
@@ -83,14 +81,13 @@ private[ml] class AFTBlockAggregator (
     // sigma is the scale parameter of the AFT model
     val sigma = math.exp(coefficientsArray(dim - 1))
 
-    // vec/arr here represents margins
-    val vec = new DenseVector(Array.ofDim[Double](size))
-    val arr = vec.values
+    // arr here represents margins
+    val arr = Array.ofDim[Double](size)
     if (fitIntercept) java.util.Arrays.fill(arr, marginOffset)
-    BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
+    BLAS.gemv(1.0, block.matrix, coefficientsArray, 1.0, arr)
 
     // in-place convert margins to gradient scales
-    // then, vec represents gradient scales
+    // then, arr represents gradient scales
     var localLossSum = 0.0
     var i = 0
     var sigmaGradSum = 0.0
@@ -112,17 +109,8 @@ private[ml] class AFTBlockAggregator (
     lossSum += localLossSum
     weightSum += size
 
-    block.matrix match {
-      case dm: DenseMatrix =>
-        BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, 
dm.numCols,
-          arr, 1, 1.0, gradientSumArray, 1)
-
-      case sm: SparseMatrix =>
-        val linearGradSumVec = new 
DenseVector(Array.ofDim[Double](numFeatures))
-        BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
-        BLAS.javaBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
-          gradientSumArray, 1)
-    }
+    // update the linear part of gradientSumArray
+    BLAS.gemv(1.0, block.matrix.transpose, arr, 1.0, gradientSumArray)
 
     if (fitIntercept) {
       // above update of the linear part of gradientSumArray does NOT take the 
centering
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/BinaryLogisticBlockAggregator.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/BinaryLogisticBlockAggregator.scala
index 09a4335..7c7f3ad 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/BinaryLogisticBlockAggregator.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/BinaryLogisticBlockAggregator.scala
@@ -61,12 +61,6 @@ private[ml] class BinaryLogisticBlockAggregator(
       s"got type ${bcCoefficients.value.getClass}.)")
   }
 
-  @transient private lazy val linear = if (fitIntercept) {
-    new DenseVector(coefficientsArray.take(numFeatures))
-  } else {
-    new DenseVector(coefficientsArray)
-  }
-
   // pre-computed margin of an empty vector.
   // with this variable as an offset, for a sparse vector, we only need to
   // deal with non-zero values in prediction.
@@ -94,17 +88,16 @@ private[ml] class BinaryLogisticBlockAggregator(
     if (block.weightIter.forall(_ == 0)) return this
     val size = block.size
 
-    // vec/arr here represents margins
-    val vec = new DenseVector(Array.ofDim[Double](size))
-    val arr = vec.values
+    // arr here represents margins
+    val arr = Array.ofDim[Double](size)
     if (fitIntercept) {
       val offset = if (fitWithMean) marginOffset else coefficientsArray.last
       java.util.Arrays.fill(arr, offset)
     }
-    BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
+    BLAS.gemv(1.0, block.matrix, coefficientsArray, 1.0, arr)
 
     // in-place convert margins to multiplier
-    // then, vec/arr represents multiplier
+    // then, arr represents multiplier
     var localLossSum = 0.0
     var localWeightSum = 0.0
     var multiplierSum = 0.0
@@ -134,24 +127,7 @@ private[ml] class BinaryLogisticBlockAggregator(
     if (arr.forall(_ == 0)) return this
 
     // update the linear part of gradientSumArray
-    block.matrix match {
-      case dm: DenseMatrix =>
-        BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, 
dm.numCols,
-          vec.values, 1, 1.0, gradientSumArray, 1)
-
-      case sm: SparseMatrix if fitIntercept =>
-        val linearGradSumVec = new 
DenseVector(Array.ofDim[Double](numFeatures))
-        BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
-        BLAS.javaBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
-          gradientSumArray, 1)
-
-      case sm: SparseMatrix if !fitIntercept =>
-        val gradSumVec = new DenseVector(gradientSumArray)
-        BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec)
-
-      case m =>
-        throw new IllegalArgumentException(s"Unknown matrix type 
${m.getClass}.")
-    }
+    BLAS.gemv(1.0, block.matrix.transpose, arr, 1.0, gradientSumArray)
 
     if (fitWithMean) {
       // above update of the linear part of gradientSumArray does NOT take the 
centering
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregator.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregator.scala
index f99c531..d479a88 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregator.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregator.scala
@@ -58,12 +58,6 @@ private[ml] class HingeBlockAggregator(
       s"got type ${bcCoefficients.value.getClass}.)")
   }
 
-  @transient private lazy val linear = if (fitIntercept) {
-    new DenseVector(coefficientsArray.take(numFeatures))
-  } else {
-    new DenseVector(coefficientsArray)
-  }
-
   // pre-computed margin of an empty vector.
   // with this variable as an offset, for a sparse vector, we only need to
   // deal with non-zero values in prediction.
@@ -91,14 +85,13 @@ private[ml] class HingeBlockAggregator(
     if (block.weightIter.forall(_ == 0)) return this
     val size = block.size
 
-    // vec/arr here represents margins
-    val vec = new DenseVector(Array.ofDim[Double](size))
-    val arr = vec.values
+    // arr here represents margins
+    val arr = Array.ofDim[Double](size)
     if (fitIntercept) java.util.Arrays.fill(arr, marginOffset)
-    BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
+    BLAS.gemv(1.0, block.matrix, coefficientsArray, 1.0, arr)
 
     // in-place convert margins to multiplier
-    // then, vec/arr represents multiplier
+    // then, arr represents multiplier
     var localLossSum = 0.0
     var localWeightSum = 0.0
     var multiplierSum = 0.0
@@ -128,24 +121,7 @@ private[ml] class HingeBlockAggregator(
     if (arr.forall(_ == 0)) return this
 
     // update the linear part of gradientSumArray
-    block.matrix match {
-      case dm: DenseMatrix =>
-        BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, 
dm.numCols,
-          vec.values, 1, 1.0, gradientSumArray, 1)
-
-      case sm: SparseMatrix if fitIntercept =>
-        val linearGradSumVec = new 
DenseVector(Array.ofDim[Double](numFeatures))
-        BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
-        BLAS.javaBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
-          gradientSumArray, 1)
-
-      case sm: SparseMatrix if !fitIntercept =>
-        val gradSumVec = new DenseVector(gradientSumArray)
-        BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec)
-
-      case m =>
-        throw new IllegalArgumentException(s"Unknown matrix type 
${m.getClass}.")
-    }
+    BLAS.gemv(1.0, block.matrix.transpose, arr, 1.0, gradientSumArray)
 
     if (fitIntercept) {
       // above update of the linear part of gradientSumArray does NOT take the 
centering
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberBlockAggregator.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberBlockAggregator.scala
index 034e35a..e8d6a26 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberBlockAggregator.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberBlockAggregator.scala
@@ -60,8 +60,6 @@ private[ml] class HuberBlockAggregator(
       s"got type ${bcCoefficients.value.getClass}.)")
   }
 
-  @transient private lazy val linear = new 
DenseVector(coefficientsArray.take(numFeatures))
-
   // pre-computed margin of an empty vector.
   // with this variable as an offset, for a sparse vector, we only need to
   // deal with non-zero values in prediction.
@@ -89,14 +87,13 @@ private[ml] class HuberBlockAggregator(
     if (block.weightIter.forall(_ == 0)) return this
     val size = block.size
 
-    // vec/arr here represents margins
-    val vec = new DenseVector(Array.ofDim[Double](size))
-    val arr = vec.values
+    // arr here represents margins
+    val arr = Array.ofDim[Double](size)
     if (fitIntercept) java.util.Arrays.fill(arr, marginOffset)
-    BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
+    BLAS.gemv(1.0, block.matrix, coefficientsArray, 1.0, arr)
 
     // in-place convert margins to multiplier
-    // then, vec/arr represents multiplier
+    // then, arr represents multiplier
     val sigma = coefficientsArray.last
     var sigmaGradSum = 0.0
     var localLossSum = 0.0
@@ -133,17 +130,8 @@ private[ml] class HuberBlockAggregator(
     lossSum += localLossSum
     weightSum += localWeightSum
 
-    block.matrix match {
-      case dm: DenseMatrix =>
-        BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, 
dm.numCols,
-          arr, 1, 1.0, gradientSumArray, 1)
-
-      case sm: SparseMatrix =>
-        val linearGradSumVec = Vectors.zeros(numFeatures).toDense
-        BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
-        BLAS.javaBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
-          gradientSumArray, 1)
-    }
+    // update the linear part of gradientSumArray
+    BLAS.gemv(1.0, block.matrix.transpose, arr, 1.0, gradientSumArray)
 
     if (fitIntercept) {
       // above update of the linear part of gradientSumArray does NOT take the 
centering
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresBlockAggregator.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresBlockAggregator.scala
index af69e60..962e0e3 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresBlockAggregator.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresBlockAggregator.scala
@@ -60,12 +60,10 @@ private[ml] class LeastSquaresBlockAggregator(
 
   protected override val dim: Int = numFeatures
 
-  @transient private lazy val effectiveCoefVec = bcCoefficients.value match {
+  @transient private lazy val effectiveCoef = bcCoefficients.value match {
     case DenseVector(values) =>
       val inverseStd = bcInverseStd.value
-      val effectiveCoefArray = Array.tabulate(numFeatures)(
-        i => if (inverseStd(i) != 0) values(i) else 0.0)
-      new DenseVector(effectiveCoefArray)
+      Array.tabulate(numFeatures)(i => if (inverseStd(i) != 0) values(i) else 
0.0)
 
     case _ => throw new IllegalArgumentException(s"coefficients only supports 
dense vector but " +
       s"got type ${bcCoefficients.value.getClass}.)")
@@ -96,15 +94,14 @@ private[ml] class LeastSquaresBlockAggregator(
 
     val size = block.size
 
-    // vec/arr here represents diffs
-    val vec = new DenseVector(Array.ofDim[Double](size))
-    val arr = vec.values
+    // arr here represents diffs
+    val arr = Array.ofDim[Double](size)
     if (fitIntercept) java.util.Arrays.fill(arr, offset)
     BLAS.javaBLAS.daxpy(size, -1.0 / labelStd, block.labels, 1, arr, 1)
-    BLAS.gemv(1.0, block.matrix, effectiveCoefVec, 1.0, vec)
+    BLAS.gemv(1.0, block.matrix, effectiveCoef, 1.0, arr)
 
     // in-place convert diffs to multipliers
-    // then, vec/arr represents multipliers
+    // then, arr represents multipliers
     var localLossSum = 0.0
     var localWeightSum = 0.0
     var i = 0
@@ -120,8 +117,7 @@ private[ml] class LeastSquaresBlockAggregator(
     lossSum += localLossSum
     weightSum += localWeightSum
 
-    val gradSumVec = new DenseVector(gradientSumArray)
-    BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec)
+    BLAS.gemv(1.0, block.matrix.transpose, arr, 1.0, gradientSumArray)
 
     this
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to