This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2802ac3 [SPARK-35666][ML] gemv skip array shape checking
2802ac3 is described below
commit 2802ac321f7378c8a9113338c9872b8fd332de6b
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Wed Jun 16 08:54:34 2021 +0800
[SPARK-35666][ML] gemv skip array shape checking
### What changes were proposed in this pull request?
In existing impls, it is common case that the vector/matrix need to be
sliced/copied just due to shape match.
which makes the logic complex and introduce extra costing of slicing &
copying.
### Why are the changes needed?
1, avoid slicing and copying due to shape checking;
2, simpify the usages;
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
existing testsuites
Closes #32805 from zhengruifeng/new_blas_func_for_agg.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../scala/org/apache/spark/ml/linalg/BLAS.scala | 60 ++++++++++++++++++++--
.../ml/optim/aggregator/AFTBlockAggregator.scala | 24 +++------
.../aggregator/BinaryLogisticBlockAggregator.scala | 34 ++----------
.../ml/optim/aggregator/HingeBlockAggregator.scala | 34 ++----------
.../ml/optim/aggregator/HuberBlockAggregator.scala | 24 +++------
.../aggregator/LeastSquaresBlockAggregator.scala | 18 +++----
6 files changed, 84 insertions(+), 110 deletions(-)
diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
index 5a6bee3..0bc8b2f 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala
@@ -536,6 +536,32 @@ private[spark] object BLAS extends Serializable {
}
/**
+ * y[0: A.numRows] := alpha * A * x[0: A.numCols] + beta * y[0: A.numRows]
+ */
+ def gemv(
+ alpha: Double,
+ A: Matrix,
+ x: Array[Double],
+ beta: Double,
+ y: Array[Double]): Unit = {
+ require(A.numCols <= x.length,
+ s"The columns of A don't match the number of elements of x. A:
${A.numCols}, x: ${x.length}")
+ require(A.numRows <= y.length,
+ s"The rows of A don't match the number of elements of y. A:
${A.numRows}, y:${y.length}")
+ if (alpha == 0.0 && beta == 1.0) {
+ // gemv: alpha is equal to 0 and beta is equal to 1. Returning y.
+ return
+ } else if (alpha == 0.0) {
+ getBLAS(A.numRows).dscal(A.numRows, beta, y, 1)
+ } else {
+ A match {
+ case smA: SparseMatrix => gemvImpl(alpha, smA, x, beta, y)
+ case dmA: DenseMatrix => gemvImpl(alpha, dmA, x, beta, y)
+ }
+ }
+ }
+
+ /**
* y := alpha * A * x + beta * y
* @param alpha a scalar to scale the multiplication A * x.
* @param A the matrix A that will be left multiplied to x. Size of m x n.
@@ -585,11 +611,24 @@ private[spark] object BLAS extends Serializable {
x: DenseVector,
beta: Double,
y: DenseVector): Unit = {
+ gemvImpl(alpha, A, x.values, beta, y.values)
+ }
+
+ /**
+ * y[0: A.numRows] := alpha * A * x[0: A.numCols] + beta * y[0: A.numRows]
+ * For `DenseMatrix` A.
+ */
+ private def gemvImpl(
+ alpha: Double,
+ A: DenseMatrix,
+ xValues: Array[Double],
+ beta: Double,
+ yValues: Array[Double]): Unit = {
val tStrA = if (A.isTransposed) "T" else "N"
val mA = if (!A.isTransposed) A.numRows else A.numCols
val nA = if (!A.isTransposed) A.numCols else A.numRows
- nativeBLAS.dgemv(tStrA, mA, nA, alpha, A.values, mA, x.values, 1, beta,
- y.values, 1)
+ nativeBLAS.dgemv(tStrA, mA, nA, alpha, A.values, mA, xValues, 1, beta,
+ yValues, 1)
}
/**
@@ -715,8 +754,19 @@ private[spark] object BLAS extends Serializable {
x: DenseVector,
beta: Double,
y: DenseVector): Unit = {
- val xValues = x.values
- val yValues = y.values
+ gemvImpl(alpha, A, x.values, beta, y.values)
+ }
+
+ /**
+ * y[0: A.numRows] := alpha * A * x[0: A.numCols] + beta * y[0: A.numRows]
+ * For `SparseMatrix` A.
+ */
+ private def gemvImpl(
+ alpha: Double,
+ A: SparseMatrix,
+ xValues: Array[Double],
+ beta: Double,
+ yValues: Array[Double]): Unit = {
val mA: Int = A.numRows
val nA: Int = A.numCols
@@ -738,7 +788,7 @@ private[spark] object BLAS extends Serializable {
rowCounter += 1
}
} else {
- if (beta != 1.0) scal(beta, y)
+ if (beta != 1.0) getBLAS(mA).dscal(mA, beta, yValues, 1)
// Perform matrix-vector multiplication and add to y
var colCounterForA = 0
while (colCounterForA < nA) {
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTBlockAggregator.scala
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTBlockAggregator.scala
index 23ceea2..355f00c 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTBlockAggregator.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/AFTBlockAggregator.scala
@@ -55,8 +55,6 @@ private[ml] class AFTBlockAggregator (
s" but got type ${bcCoefficients.value.getClass}.")
}
- @transient private lazy val linear =
Vectors.dense(coefficientsArray.take(numFeatures))
-
// pre-computed margin of an empty vector.
// with this variable as an offset, for a sparse vector, we only need to
// deal with non-zero values in prediction.
@@ -83,14 +81,13 @@ private[ml] class AFTBlockAggregator (
// sigma is the scale parameter of the AFT model
val sigma = math.exp(coefficientsArray(dim - 1))
- // vec/arr here represents margins
- val vec = new DenseVector(Array.ofDim[Double](size))
- val arr = vec.values
+ // arr here represents margins
+ val arr = Array.ofDim[Double](size)
if (fitIntercept) java.util.Arrays.fill(arr, marginOffset)
- BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
+ BLAS.gemv(1.0, block.matrix, coefficientsArray, 1.0, arr)
// in-place convert margins to gradient scales
- // then, vec represents gradient scales
+ // then, arr represents gradient scales
var localLossSum = 0.0
var i = 0
var sigmaGradSum = 0.0
@@ -112,17 +109,8 @@ private[ml] class AFTBlockAggregator (
lossSum += localLossSum
weightSum += size
- block.matrix match {
- case dm: DenseMatrix =>
- BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values,
dm.numCols,
- arr, 1, 1.0, gradientSumArray, 1)
-
- case sm: SparseMatrix =>
- val linearGradSumVec = new
DenseVector(Array.ofDim[Double](numFeatures))
- BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
- BLAS.javaBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
- gradientSumArray, 1)
- }
+ // update the linear part of gradientSumArray
+ BLAS.gemv(1.0, block.matrix.transpose, arr, 1.0, gradientSumArray)
if (fitIntercept) {
// above update of the linear part of gradientSumArray does NOT take the
centering
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/BinaryLogisticBlockAggregator.scala
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/BinaryLogisticBlockAggregator.scala
index 09a4335..7c7f3ad 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/BinaryLogisticBlockAggregator.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/BinaryLogisticBlockAggregator.scala
@@ -61,12 +61,6 @@ private[ml] class BinaryLogisticBlockAggregator(
s"got type ${bcCoefficients.value.getClass}.)")
}
- @transient private lazy val linear = if (fitIntercept) {
- new DenseVector(coefficientsArray.take(numFeatures))
- } else {
- new DenseVector(coefficientsArray)
- }
-
// pre-computed margin of an empty vector.
// with this variable as an offset, for a sparse vector, we only need to
// deal with non-zero values in prediction.
@@ -94,17 +88,16 @@ private[ml] class BinaryLogisticBlockAggregator(
if (block.weightIter.forall(_ == 0)) return this
val size = block.size
- // vec/arr here represents margins
- val vec = new DenseVector(Array.ofDim[Double](size))
- val arr = vec.values
+ // arr here represents margins
+ val arr = Array.ofDim[Double](size)
if (fitIntercept) {
val offset = if (fitWithMean) marginOffset else coefficientsArray.last
java.util.Arrays.fill(arr, offset)
}
- BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
+ BLAS.gemv(1.0, block.matrix, coefficientsArray, 1.0, arr)
// in-place convert margins to multiplier
- // then, vec/arr represents multiplier
+ // then, arr represents multiplier
var localLossSum = 0.0
var localWeightSum = 0.0
var multiplierSum = 0.0
@@ -134,24 +127,7 @@ private[ml] class BinaryLogisticBlockAggregator(
if (arr.forall(_ == 0)) return this
// update the linear part of gradientSumArray
- block.matrix match {
- case dm: DenseMatrix =>
- BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values,
dm.numCols,
- vec.values, 1, 1.0, gradientSumArray, 1)
-
- case sm: SparseMatrix if fitIntercept =>
- val linearGradSumVec = new
DenseVector(Array.ofDim[Double](numFeatures))
- BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
- BLAS.javaBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
- gradientSumArray, 1)
-
- case sm: SparseMatrix if !fitIntercept =>
- val gradSumVec = new DenseVector(gradientSumArray)
- BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec)
-
- case m =>
- throw new IllegalArgumentException(s"Unknown matrix type
${m.getClass}.")
- }
+ BLAS.gemv(1.0, block.matrix.transpose, arr, 1.0, gradientSumArray)
if (fitWithMean) {
// above update of the linear part of gradientSumArray does NOT take the
centering
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregator.scala
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregator.scala
index f99c531..d479a88 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregator.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeBlockAggregator.scala
@@ -58,12 +58,6 @@ private[ml] class HingeBlockAggregator(
s"got type ${bcCoefficients.value.getClass}.)")
}
- @transient private lazy val linear = if (fitIntercept) {
- new DenseVector(coefficientsArray.take(numFeatures))
- } else {
- new DenseVector(coefficientsArray)
- }
-
// pre-computed margin of an empty vector.
// with this variable as an offset, for a sparse vector, we only need to
// deal with non-zero values in prediction.
@@ -91,14 +85,13 @@ private[ml] class HingeBlockAggregator(
if (block.weightIter.forall(_ == 0)) return this
val size = block.size
- // vec/arr here represents margins
- val vec = new DenseVector(Array.ofDim[Double](size))
- val arr = vec.values
+ // arr here represents margins
+ val arr = Array.ofDim[Double](size)
if (fitIntercept) java.util.Arrays.fill(arr, marginOffset)
- BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
+ BLAS.gemv(1.0, block.matrix, coefficientsArray, 1.0, arr)
// in-place convert margins to multiplier
- // then, vec/arr represents multiplier
+ // then, arr represents multiplier
var localLossSum = 0.0
var localWeightSum = 0.0
var multiplierSum = 0.0
@@ -128,24 +121,7 @@ private[ml] class HingeBlockAggregator(
if (arr.forall(_ == 0)) return this
// update the linear part of gradientSumArray
- block.matrix match {
- case dm: DenseMatrix =>
- BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values,
dm.numCols,
- vec.values, 1, 1.0, gradientSumArray, 1)
-
- case sm: SparseMatrix if fitIntercept =>
- val linearGradSumVec = new
DenseVector(Array.ofDim[Double](numFeatures))
- BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
- BLAS.javaBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
- gradientSumArray, 1)
-
- case sm: SparseMatrix if !fitIntercept =>
- val gradSumVec = new DenseVector(gradientSumArray)
- BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec)
-
- case m =>
- throw new IllegalArgumentException(s"Unknown matrix type
${m.getClass}.")
- }
+ BLAS.gemv(1.0, block.matrix.transpose, arr, 1.0, gradientSumArray)
if (fitIntercept) {
// above update of the linear part of gradientSumArray does NOT take the
centering
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberBlockAggregator.scala
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberBlockAggregator.scala
index 034e35a..e8d6a26 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberBlockAggregator.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberBlockAggregator.scala
@@ -60,8 +60,6 @@ private[ml] class HuberBlockAggregator(
s"got type ${bcCoefficients.value.getClass}.)")
}
- @transient private lazy val linear = new
DenseVector(coefficientsArray.take(numFeatures))
-
// pre-computed margin of an empty vector.
// with this variable as an offset, for a sparse vector, we only need to
// deal with non-zero values in prediction.
@@ -89,14 +87,13 @@ private[ml] class HuberBlockAggregator(
if (block.weightIter.forall(_ == 0)) return this
val size = block.size
- // vec/arr here represents margins
- val vec = new DenseVector(Array.ofDim[Double](size))
- val arr = vec.values
+ // arr here represents margins
+ val arr = Array.ofDim[Double](size)
if (fitIntercept) java.util.Arrays.fill(arr, marginOffset)
- BLAS.gemv(1.0, block.matrix, linear, 1.0, vec)
+ BLAS.gemv(1.0, block.matrix, coefficientsArray, 1.0, arr)
// in-place convert margins to multiplier
- // then, vec/arr represents multiplier
+ // then, arr represents multiplier
val sigma = coefficientsArray.last
var sigmaGradSum = 0.0
var localLossSum = 0.0
@@ -133,17 +130,8 @@ private[ml] class HuberBlockAggregator(
lossSum += localLossSum
weightSum += localWeightSum
- block.matrix match {
- case dm: DenseMatrix =>
- BLAS.nativeBLAS.dgemv("N", dm.numCols, dm.numRows, 1.0, dm.values,
dm.numCols,
- arr, 1, 1.0, gradientSumArray, 1)
-
- case sm: SparseMatrix =>
- val linearGradSumVec = Vectors.zeros(numFeatures).toDense
- BLAS.gemv(1.0, sm.transpose, vec, 0.0, linearGradSumVec)
- BLAS.javaBLAS.daxpy(numFeatures, 1.0, linearGradSumVec.values, 1,
- gradientSumArray, 1)
- }
+ // update the linear part of gradientSumArray
+ BLAS.gemv(1.0, block.matrix.transpose, arr, 1.0, gradientSumArray)
if (fitIntercept) {
// above update of the linear part of gradientSumArray does NOT take the
centering
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresBlockAggregator.scala
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresBlockAggregator.scala
index af69e60..962e0e3 100644
---
a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresBlockAggregator.scala
+++
b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LeastSquaresBlockAggregator.scala
@@ -60,12 +60,10 @@ private[ml] class LeastSquaresBlockAggregator(
protected override val dim: Int = numFeatures
- @transient private lazy val effectiveCoefVec = bcCoefficients.value match {
+ @transient private lazy val effectiveCoef = bcCoefficients.value match {
case DenseVector(values) =>
val inverseStd = bcInverseStd.value
- val effectiveCoefArray = Array.tabulate(numFeatures)(
- i => if (inverseStd(i) != 0) values(i) else 0.0)
- new DenseVector(effectiveCoefArray)
+ Array.tabulate(numFeatures)(i => if (inverseStd(i) != 0) values(i) else
0.0)
case _ => throw new IllegalArgumentException(s"coefficients only supports
dense vector but " +
s"got type ${bcCoefficients.value.getClass}.)")
@@ -96,15 +94,14 @@ private[ml] class LeastSquaresBlockAggregator(
val size = block.size
- // vec/arr here represents diffs
- val vec = new DenseVector(Array.ofDim[Double](size))
- val arr = vec.values
+ // arr here represents diffs
+ val arr = Array.ofDim[Double](size)
if (fitIntercept) java.util.Arrays.fill(arr, offset)
BLAS.javaBLAS.daxpy(size, -1.0 / labelStd, block.labels, 1, arr, 1)
- BLAS.gemv(1.0, block.matrix, effectiveCoefVec, 1.0, vec)
+ BLAS.gemv(1.0, block.matrix, effectiveCoef, 1.0, arr)
// in-place convert diffs to multipliers
- // then, vec/arr represents multipliers
+ // then, arr represents multipliers
var localLossSum = 0.0
var localWeightSum = 0.0
var i = 0
@@ -120,8 +117,7 @@ private[ml] class LeastSquaresBlockAggregator(
lossSum += localLossSum
weightSum += localWeightSum
- val gradSumVec = new DenseVector(gradientSumArray)
- BLAS.gemv(1.0, block.matrix.transpose, vec, 1.0, gradSumVec)
+ BLAS.gemv(1.0, block.matrix.transpose, arr, 1.0, gradientSumArray)
this
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]