This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cdc66a71ea3d [SPARK-45757][ML] Avoid re-computation of NNZ in Binarizer
cdc66a71ea3d is described below
commit cdc66a71ea3d5da0f3f3ef7eabb215336ae472a5
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Thu Nov 2 14:27:14 2023 -0700
[SPARK-45757][ML] Avoid re-computation of NNZ in Binarizer
### What changes were proposed in this pull request?
1, compress vectors with given nnz in Binarizer;
2, rename internal function `def compressed(nnz: Int): Vector` to avoid
ambiguous reference issue (`vec.compressed.apply(nnz)`) when there is no type
hint
```
[error]
/Users/ruifeng.zheng/Dev/spark/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala:132:61:
ambiguous reference to overloaded definition,
[error] both method compressed in trait Vector of type (nnz: Int):
org.apache.spark.ml.linalg.Vector
[error] and method compressed in trait Vector of type
org.apache.spark.ml.linalg.Vector
```
### Why are the changes needed?
`nnz` is known before compression
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
ci
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #43619 from zhengruifeng/ml_binarizer_nnz.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../scala/org/apache/spark/ml/linalg/Vectors.scala | 4 ++--
.../org/apache/spark/ml/feature/Binarizer.scala | 26 +++++++++++++---------
.../apache/spark/ml/feature/VectorAssembler.scala | 6 +++--
3 files changed, 22 insertions(+), 14 deletions(-)
diff --git
a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
index 016a8366ab86..d8e17ddd24db 100644
--- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
+++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/Vectors.scala
@@ -184,9 +184,9 @@ sealed trait Vector extends Serializable {
* Returns a vector in either dense or sparse format, whichever uses less
storage.
*/
@Since("2.0.0")
- def compressed: Vector = compressed(numNonzeros)
+ def compressed: Vector = compressedWithNNZ(numNonzeros)
- private[ml] def compressed(nnz: Int): Vector = {
+ private[ml] def compressedWithNNZ(nnz: Int): Vector = {
// A dense vector needs 8 * size + 8 bytes, while a sparse vector needs 12
* nnz + 20 bytes.
if (1.5 * (nnz + 1.0) < size) {
toSparseWithSize(nnz)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
index 2ec7a8632e39..2e09e7444957 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
@@ -112,10 +112,11 @@ final class Binarizer @Since("1.4.0") (@Since("1.4.0")
override val uid: String)
(Seq($(inputCol)), Seq($(outputCol)), Seq($(threshold)))
}
- val mappedOutputCols = inputColNames.zip(tds).map { case (inputColName,
td) =>
- val binarizerUDF = dataset.schema(inputColName).dataType match {
+ val mappedOutputCols = inputColNames.zip(tds).map { case (colName, td) =>
+ dataset.schema(colName).dataType match {
case DoubleType =>
- udf { in: Double => if (in > td) 1.0 else 0.0 }
+ when(!col(colName).isNaN && col(colName) > td, lit(1.0))
+ .otherwise(lit(0.0))
case _: VectorUDT if td >= 0 =>
udf { vector: Vector =>
@@ -124,27 +125,32 @@ final class Binarizer @Since("1.4.0") (@Since("1.4.0")
override val uid: String)
vector.foreachNonZero { (index, value) =>
if (value > td) {
indices += index
- values += 1.0
+ values += 1.0
}
}
- Vectors.sparse(vector.size, indices.result(),
values.result()).compressed
- }
+
+ val idxArray = indices.result()
+ val valArray = values.result()
+ Vectors.sparse(vector.size, idxArray, valArray)
+ .compressedWithNNZ(idxArray.length)
+ }.apply(col(colName))
case _: VectorUDT if td < 0 =>
this.logWarning(s"Binarization operations on sparse dataset with
negative threshold " +
s"$td will build a dense output, so take care when applying to
sparse input.")
udf { vector: Vector =>
val values = Array.fill(vector.size)(1.0)
+ var nnz = vector.size
vector.foreachNonZero { (index, value) =>
if (value <= td) {
values(index) = 0.0
+ nnz -= 1
}
}
- Vectors.dense(values).compressed
- }
- }
- binarizerUDF(col(inputColName))
+ Vectors.dense(values).compressedWithNNZ(nnz)
+ }.apply(col(colName))
+ }
}
val outputMetadata = outputColNames.map(outputSchema(_).metadata)
diff --git
a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
index 761352e34a3e..cf5b5ecb2014 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
@@ -296,7 +296,9 @@ object VectorAssembler extends
DefaultParamsReadable[VectorAssembler] {
throw new SparkException(s"$o of type ${o.getClass.getName} is not
supported.")
}
- val (idxArray, valArray) = (indices.result(), values.result())
- Vectors.sparse(featureIndex, idxArray,
valArray).compressed(idxArray.length)
+ val idxArray = indices.result()
+ val valArray = values.result()
+ Vectors.sparse(featureIndex, idxArray, valArray)
+ .compressedWithNNZ(idxArray.length)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]