This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2802ac3 [SPARK-35666][ML] gemv skip array shape checking 2802ac3 is described below commit 2802ac321f7378c8a9113338c9872b8fd332de6b Author: Ruifeng Zheng <ruife...@foxmail.com> AuthorDate: Wed Jun 16 08:54:34 2021 +0800 [SPARK-35666][ML] gemv skip array shape checking ### What changes were proposed in this pull request? In existing impls, it is common case that the vector/matrix need to be sliced/copied just due to shape match. which makes the logic complex and introduce extra costing of slicing & copying. ### Why are the changes needed? 1, avoid slicing and copying due to shape checking; 2, simpify the usages; ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing testsuites Closes #32805 from zhengruifeng/new_blas_func_for_agg. Authored-by: Ruifeng Zheng <ruife...@foxmail.com> Signed-off-by: Ruifeng Zheng <ruife...@foxmail.com> --- .../scala/org/apache/spark/ml/linalg/BLAS.scala | 60 ++++++++++++++++++++-- .../ml/optim/aggregator/AFTBlockAggregator.scala | 24 +++------ .../aggregator/BinaryLogisticBlockAggregator.scala | 34 ++---------- .../ml/optim/aggregator/HingeBlockAggregator.scala | 34 ++---------- .../ml/optim/aggregator/HuberBlockAggregator.scala | 24 +++------ .../aggregator/LeastSquaresBlockAggregator.scala | 18 +++---- 6 files changed, 84 insertions(+), 110 deletions(-) diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala index 5a6bee3..0bc8b2f 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala @@ -536,6 +536,32 @@ private[spark] object BLAS extends Serializable { } /** + * y[0: A.numRows] := alpha * A * x[0: A.numCols] + beta * y[0: A.numRows] + */ + def gemv( + alpha: Double, + A: Matrix, + x: Array[Double], + beta: Double, + y: Array[Double]): Unit = { + require(A.numCols <= x.length, + s"The columns of A don't match the number of elements of x. A: ${A.numCols}, x: ${x.length}") + require(A.numRows <= y.length, + s"The rows of A don't match the number of elements of y. A: ${A.numRows}, y:${y.length}") + if (alpha == 0.0 && beta == 1.0) { + // gemv: alpha is equal to 0 and beta is equal to 1. Returning y. + return + } else if (alpha == 0.0) { + getBLAS(A.numRows).dscal(A.numRows, beta, y, 1) + } else { + A match { + case smA: SparseMatrix => gemvImpl(alpha, smA, x, beta, y) + case dmA: DenseMatrix => gemvImpl(alpha, dmA, x, beta, y) + } + } + } + + /** * y := alpha * A * x + beta * y * @param alpha a scalar to scale the multiplication A * x. * @param A the matrix A that will be left multiplied to x. Size of m x n. @@ -585,11 +611,24 @@ private[spark] object BLAS extends Serializable { x: DenseVector, beta: Double, y: DenseVector): Unit = { + gemvImpl(alpha, A, x.values, beta, y.values) + } + + /** + * y[0: A.numRows] := alpha * A * x[0: A.numCols] + beta * y[0: A.numRows] + * For `DenseMatrix` A. + */ + private def gemvImpl( + alpha: Double, + A: DenseMatrix, + xValues: Array[Double], + beta: Double, + yValues: Array[Double]): Unit = { val tStrA = if (A.isTransposed) "T" else "N" val mA = if (!A.isTransposed) A.numRows else A.numCols val nA = if (!A.isTransposed) A.numCols else A.numRows - nativeBLAS.dgemv(tStrA, mA, nA, alpha, A.values, mA, x.values, 1, beta, - y.values, 1) + nativeBLAS.dgemv(tStrA, mA, nA, alpha, A.values, mA, xValues, 1, beta, + yValues, 1) } /** @@ -715,8 +754,19 @@ private[spark] object BLAS extends Serializable { x: DenseVector, beta: Double, y: DenseVector): Unit = { - val xValues = x.values - val yValues = y.values + gemvImpl(alpha, A, x.values, beta, y.values) + } + + /** + * y[0: A.numRows] := alpha * A * x[0: A.numCols] + beta * y[0: A.numRows] + * For `SparseMatrix` A. + */ + private def gemvImpl( + alpha: Double, + A: SparseMatrix, + xValues: Array[Double], + beta: Double, + yValues: Array[Double]): Unit = { val mA: Int = A.numRows val nA: Int = A.numCols @@ -738,7 +788,7 @@ private[spark] object BLAS extends Serializable { rowCounter += 1 } } else { - if (beta != 1.0) scal(beta, y) + if (beta != 1.0) getBLAS(mA).dscal(mA, beta, yValues, 1) // Perform matrix-vector multiplication and add to y var colCounterForA = 0 while (colCounterForA < nA) { diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTBlockAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTBlockAggregator.scala index 23ceea2..355f00c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTBlockAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTBlockAggregator.scala @@ -55,8 +55,6 @@ private[ml] class AFTBlockAggregator ( s" but got type ${bcCoefficients.value.getClass}.") } - @transient private lazy val linear = Vectors.dense(coefficientsArray.take(numFeatures)) - // pre-computed margin of an empty vector. // with this variable as an offset, for a sparse vector, we only need to // deal with non-zero values in prediction. @@ -83,14 +81,13 @@ private[ml] class AFTBlockAggregator ( // sigma is the scale parameter of the AFT model val sigma = math.exp(coefficientsArray(dim - 1)) - // vec/arr here represents margins - val vec = new DenseVector(Array.ofDim[Double](size)) - val arr = vec.values + // arr here represents margins + val arr = Array.ofDim[Double](size) if (fitIntercept) java.util.Arrays.fill(arr, marginOffset) - BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) + BLAS.gemv(1.0, block.matrix, coefficientsArray, 1.0, arr) // in-place convert margins to gradient scales - // then, vec represents gradient scales + // then, arr represents gradient scales var localLossSum = 0.0 var i = 0 var sigmaGradSum = 0.0 @@ -112,17 +109,8 @@ private[ml] class AFTBlockAggregator ( lossSum += localLossSum weightSum += size - block.matrix match { - case dm: DenseMatrix => - BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols, - arr, 1, 1.0, gradientSumArray, 1) - - case sm: SparseMatrix => - val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures)) - BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec) - BLAS.javaBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1, - gradientSumArray, 1) - } + // update the linear part of gradientSumArray + BLAS.gemv(1.0, block.matrix.transpose, arr, 1.0, gradientSumArray) if (fitIntercept) { // above update of the linear part of gradientSumArray does NOT take the centering diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/BinaryLogisticBlockAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/BinaryLogisticBlockAggregator.scala index 09a4335..7c7f3ad 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/BinaryLogisticBlockAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/BinaryLogisticBlockAggregator.scala @@ -61,12 +61,6 @@ private[ml] class BinaryLogisticBlockAggregator( s"got type ${bcCoefficients.value.getClass}.)") } - @transient private lazy val linear = if (fitIntercept) { - new DenseVector(coefficientsArray.take(numFeatures)) - } else { - new DenseVector(coefficientsArray) - } - // pre-computed margin of an empty vector. // with this variable as an offset, for a sparse vector, we only need to // deal with non-zero values in prediction. @@ -94,17 +88,16 @@ private[ml] class BinaryLogisticBlockAggregator( if (block.weightIter.forall(_ == 0)) return this val size = block.size - // vec/arr here represents margins - val vec = new DenseVector(Array.ofDim[Double](size)) - val arr = vec.values + // arr here represents margins + val arr = Array.ofDim[Double](size) if (fitIntercept) { val offset = if (fitWithMean) marginOffset else coefficientsArray.last java.util.Arrays.fill(arr, offset) } - BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) + BLAS.gemv(1.0, block.matrix, coefficientsArray, 1.0, arr) // in-place convert margins to multiplier - // then, vec/arr represents multiplier + // then, arr represents multiplier var localLossSum = 0.0 var localWeightSum = 0.0 var multiplierSum = 0.0 @@ -134,24 +127,7 @@ private[ml] class BinaryLogisticBlockAggregator( if (arr.forall(_ == 0)) return this // update the linear part of gradientSumArray - block.matrix match { - case dm: DenseMatrix => - BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols, - vec.values, 1, 1.0, gradientSumArray, 1) - - case sm: SparseMatrix if fitIntercept => - val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures)) - BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec) - BLAS.javaBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1, - gradientSumArray, 1) - - case sm: SparseMatrix if !fitIntercept => - val gradSumVec = new DenseVector(gradientSumArray) - BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec) - - case m => - throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.") - } + BLAS.gemv(1.0, block.matrix.transpose, arr, 1.0, gradientSumArray) if (fitWithMean) { // above update of the linear part of gradientSumArray does NOT take the centering diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregator.scala index f99c531..d479a88 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregator.scala @@ -58,12 +58,6 @@ private[ml] class HingeBlockAggregator( s"got type ${bcCoefficients.value.getClass}.)") } - @transient private lazy val linear = if (fitIntercept) { - new DenseVector(coefficientsArray.take(numFeatures)) - } else { - new DenseVector(coefficientsArray) - } - // pre-computed margin of an empty vector. // with this variable as an offset, for a sparse vector, we only need to // deal with non-zero values in prediction. @@ -91,14 +85,13 @@ private[ml] class HingeBlockAggregator( if (block.weightIter.forall(_ == 0)) return this val size = block.size - // vec/arr here represents margins - val vec = new DenseVector(Array.ofDim[Double](size)) - val arr = vec.values + // arr here represents margins + val arr = Array.ofDim[Double](size) if (fitIntercept) java.util.Arrays.fill(arr, marginOffset) - BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) + BLAS.gemv(1.0, block.matrix, coefficientsArray, 1.0, arr) // in-place convert margins to multiplier - // then, vec/arr represents multiplier + // then, arr represents multiplier var localLossSum = 0.0 var localWeightSum = 0.0 var multiplierSum = 0.0 @@ -128,24 +121,7 @@ private[ml] class HingeBlockAggregator( if (arr.forall(_ == 0)) return this // update the linear part of gradientSumArray - block.matrix match { - case dm: DenseMatrix => - BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols, - vec.values, 1, 1.0, gradientSumArray, 1) - - case sm: SparseMatrix if fitIntercept => - val linearGradSumVec = new DenseVector(Array.ofDim[Double](numFeatures)) - BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec) - BLAS.javaBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1, - gradientSumArray, 1) - - case sm: SparseMatrix if !fitIntercept => - val gradSumVec = new DenseVector(gradientSumArray) - BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec) - - case m => - throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.") - } + BLAS.gemv(1.0, block.matrix.transpose, arr, 1.0, gradientSumArray) if (fitIntercept) { // above update of the linear part of gradientSumArray does NOT take the centering diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberBlockAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberBlockAggregator.scala index 034e35a..e8d6a26 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberBlockAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberBlockAggregator.scala @@ -60,8 +60,6 @@ private[ml] class HuberBlockAggregator( s"got type ${bcCoefficients.value.getClass}.)") } - @transient private lazy val linear = new DenseVector(coefficientsArray.take(numFeatures)) - // pre-computed margin of an empty vector. // with this variable as an offset, for a sparse vector, we only need to // deal with non-zero values in prediction. @@ -89,14 +87,13 @@ private[ml] class HuberBlockAggregator( if (block.weightIter.forall(_ == 0)) return this val size = block.size - // vec/arr here represents margins - val vec = new DenseVector(Array.ofDim[Double](size)) - val arr = vec.values + // arr here represents margins + val arr = Array.ofDim[Double](size) if (fitIntercept) java.util.Arrays.fill(arr, marginOffset) - BLAS.gemv(1.0, block.matrix, linear, 1.0, vec) + BLAS.gemv(1.0, block.matrix, coefficientsArray, 1.0, arr) // in-place convert margins to multiplier - // then, vec/arr represents multiplier + // then, arr represents multiplier val sigma = coefficientsArray.last var sigmaGradSum = 0.0 var localLossSum = 0.0 @@ -133,17 +130,8 @@ private[ml] class HuberBlockAggregator( lossSum += localLossSum weightSum += localWeightSum - block.matrix match { - case dm: DenseMatrix => - BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values, dm.numCols, - arr, 1, 1.0, gradientSumArray, 1) - - case sm: SparseMatrix => - val linearGradSumVec = Vectors.zeros(numFeatures).toDense - BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec) - BLAS.javaBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1, - gradientSumArray, 1) - } + // update the linear part of gradientSumArray + BLAS.gemv(1.0, block.matrix.transpose, arr, 1.0, gradientSumArray) if (fitIntercept) { // above update of the linear part of gradientSumArray does NOT take the centering diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresBlockAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresBlockAggregator.scala index af69e60..962e0e3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresBlockAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresBlockAggregator.scala @@ -60,12 +60,10 @@ private[ml] class LeastSquaresBlockAggregator( protected override val dim: Int = numFeatures - @transient private lazy val effectiveCoefVec = bcCoefficients.value match { + @transient private lazy val effectiveCoef = bcCoefficients.value match { case DenseVector(values) => val inverseStd = bcInverseStd.value - val effectiveCoefArray = Array.tabulate(numFeatures)( - i => if (inverseStd(i) != 0) values(i) else 0.0) - new DenseVector(effectiveCoefArray) + Array.tabulate(numFeatures)(i => if (inverseStd(i) != 0) values(i) else 0.0) case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector but " + s"got type ${bcCoefficients.value.getClass}.)") @@ -96,15 +94,14 @@ private[ml] class LeastSquaresBlockAggregator( val size = block.size - // vec/arr here represents diffs - val vec = new DenseVector(Array.ofDim[Double](size)) - val arr = vec.values + // arr here represents diffs + val arr = Array.ofDim[Double](size) if (fitIntercept) java.util.Arrays.fill(arr, offset) BLAS.javaBLAS.daxpy(size, -1.0 / labelStd, block.labels, 1, arr, 1) - BLAS.gemv(1.0, block.matrix, effectiveCoefVec, 1.0, vec) + BLAS.gemv(1.0, block.matrix, effectiveCoef, 1.0, arr) // in-place convert diffs to multipliers - // then, vec/arr represents multipliers + // then, arr represents multipliers var localLossSum = 0.0 var localWeightSum = 0.0 var i = 0 @@ -120,8 +117,7 @@ private[ml] class LeastSquaresBlockAggregator( lossSum += localLossSum weightSum += localWeightSum - val gradSumVec = new DenseVector(gradientSumArray) - BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec) + BLAS.gemv(1.0, block.matrix.transpose, arr, 1.0, gradientSumArray) this } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org