This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dfa2328 [SPARK-26881][MLLIB] Heuristic for tree aggregate depth
dfa2328 is described below
commit dfa2328e2806edf6f079dcbf74f8ece764c1f130
Author: Rafael Renaudin <[email protected]>
AuthorDate: Mon Apr 8 20:56:53 2019 -0500
[SPARK-26881][MLLIB] Heuristic for tree aggregate depth
Changes proposed:
- Adding method to compute treeAggregate depth required to avoid exceeding
driver max result size (first commit)
- Using it in the computation of grammian of RowMatrix (second commit)
Tests:
- Unit Test wise, one unit test checking the behavior of the depth
computation method
- Tested at scale on hadoop cluster by doing PCA on a large dataset (needed
depth 3 to succeed)
Debatable choice:
I'm not sure if RDD API is the right place to put the depth computation
method. The advantage of it is that it allows to access driver max result size,
and rdd number of partitions, to set default arguments for the method.
Semantically, such a method might belong to something like
org.apache.spark.util.Utils though.
Closes #23983 from gagafunctor/Heuristic_for_treeAggregate_depth.
Authored-by: Rafael Renaudin <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
---
.../spark/mllib/linalg/distributed/RowMatrix.scala | 34 +++++++++++++++++++++-
.../mllib/linalg/distributed/RowMatrixSuite.scala | 20 +++++++++++++
2 files changed, 53 insertions(+), 1 deletion(-)
diff --git
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 56caeac..43f48be 100644
---
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -27,6 +27,7 @@ import breeze.numerics.{sqrt => brzSqrt}
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.MAX_RESULT_SIZE
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer,
MultivariateStatisticalSummary}
import org.apache.spark.rdd.RDD
@@ -117,6 +118,7 @@ class RowMatrix @Since("1.0.0") (
// Computes n*(n+1)/2, avoiding overflow in the multiplication.
// This succeeds when n <= 65535, which is checked above
val nt = if (n % 2 == 0) ((n / 2) * (n + 1)) else (n * ((n + 1) / 2))
+ val gramianSizeInBytes = nt * 8L
// Compute the upper triangular part of the gram matrix.
val GU = rows.treeAggregate(null.asInstanceOf[BDV[Double]])(
@@ -136,7 +138,8 @@ class RowMatrix @Since("1.0.0") (
U1
} else {
U1 += U2
- }
+ },
+ depth = getTreeAggregateIdealDepth(gramianSizeInBytes)
)
RowMatrix.triuToFull(n, GU.data)
@@ -775,6 +778,35 @@ class RowMatrix @Since("1.0.0") (
s"The number of rows $m is different from what specified or previously
computed: ${nRows}.")
}
}
+
+ /**
+ * Computing desired tree aggregate depth necessary to avoid exceeding
+ * driver.MaxResultSize during aggregation.
+ * Based on the formulae: (numPartitions)^(1/depth) * objectSize <=
DriverMaxResultSize
+ * @param aggregatedObjectSizeInBytes the size, in megabytes, of the object
being tree aggregated
+ */
+ private[spark] def getTreeAggregateIdealDepth(aggregatedObjectSizeInBytes:
Long) = {
+ require(aggregatedObjectSizeInBytes > 0,
+ "Cannot compute aggregate depth heuristic based on a zero-size object to
aggregate")
+
+ val maxDriverResultSizeInBytes = rows.conf.get[Long](MAX_RESULT_SIZE)
+
+ require(maxDriverResultSizeInBytes > aggregatedObjectSizeInBytes,
+ s"Cannot aggregate object of size $aggregatedObjectSizeInBytes Bytes, "
+ + s"as it's bigger than maxResultSize ($maxDriverResultSizeInBytes
Bytes)")
+
+ val numerator = math.log(rows.getNumPartitions)
+ val denominator = math.log(maxDriverResultSizeInBytes) -
math.log(aggregatedObjectSizeInBytes)
+ val desiredTreeDepth = math.ceil(numerator / denominator)
+
+ if (desiredTreeDepth > 4) {
+ logWarning(
+ s"Desired tree depth for treeAggregation is big ($desiredTreeDepth)."
+ + "Consider increasing driver max result size or reducing number of
partitions")
+ }
+
+ math.min(math.max(1, desiredTreeDepth), 10).toInt
+ }
}
@Since("1.0.0")
diff --git
a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
index a4ca4f0..a0c4c68 100644
---
a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
+++
b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
@@ -101,6 +101,26 @@ class RowMatrixSuite extends SparkFunSuite with
MLlibTestSparkContext {
}
}
+ test("getTreeAggregateIdealDepth") {
+ val nbPartitions = 100
+ val vectors = sc.emptyRDD[Vector]
+ .repartition(nbPartitions)
+ val rowMat = new RowMatrix(vectors)
+
+ assert(rowMat.getTreeAggregateIdealDepth(100 * 1024 * 1024) === 2)
+ assert(rowMat.getTreeAggregateIdealDepth(110 * 1024 * 1024) === 3)
+ assert(rowMat.getTreeAggregateIdealDepth(700 * 1024 * 1024) === 10)
+
+ val zeroSizeException = intercept[Exception]{
+ rowMat.getTreeAggregateIdealDepth(0)
+ }
+ assert(zeroSizeException.getMessage.contains("zero-size object to
aggregate"))
+ val objectBiggerThanResultSize = intercept[Exception]{
+ rowMat.getTreeAggregateIdealDepth(1100 * 1024 * 1024)
+ }
+ assert(objectBiggerThanResultSize.getMessage.contains("it's bigger than
maxResultSize"))
+ }
+
test("similar columns") {
val colMags = Vectors.dense(math.sqrt(126), math.sqrt(66), math.sqrt(94))
val expected = BDM(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]