Repository: spark Updated Branches: refs/heads/master 2d39711b0 -> 98b5ccd32
[SPARK-20930][ML] Destroy broadcasted centers after computing cost in KMeans ## What changes were proposed in this pull request? Destroy broadcasted centers after computing cost ## How was this patch tested? existing tests Author: Zheng RuiFeng <[email protected]> Closes #18152 from zhengruifeng/destroy_kmeans_model. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98b5ccd3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98b5ccd3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98b5ccd3 Branch: refs/heads/master Commit: 98b5ccd32b909cccc38899efa923ca425b116744 Parents: 2d39711 Author: Zheng RuiFeng <[email protected]> Authored: Mon Jun 5 10:25:09 2017 +0100 Committer: Sean Owen <[email protected]> Committed: Mon Jun 5 10:25:09 2017 +0100 ---------------------------------------------------------------------- .../scala/org/apache/spark/mllib/clustering/KMeansModel.scala | 5 ++++- .../main/scala/org/apache/spark/mllib/clustering/LDAModel.scala | 4 ++-- .../org/apache/spark/mllib/optimization/GradientDescent.scala | 1 + 3 files changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/98b5ccd3/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index df2a9c0..3ad08c4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -85,7 +85,10 @@ class KMeansModel @Since("1.1.0") (@Since("1.0.0") val clusterCenters: Array[Vec @Since("0.8.0") def computeCost(data: RDD[Vector]): Double = { val bcCentersWithNorm = data.context.broadcast(clusterCentersWithNorm) - data.map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum() + val cost = data + .map(p => KMeans.pointCost(bcCentersWithNorm.value, new VectorWithNorm(p))).sum() + bcCentersWithNorm.destroy(blocking = false) + cost } http://git-wip-us.apache.org/repos/asf/spark/blob/98b5ccd3/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 663f63c..4ab4200 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -320,6 +320,7 @@ class LocalLDAModel private[spark] ( docBound }.sum() + ElogbetaBc.destroy(blocking = false) // Bound component for prob(topic-term distributions): // E[log p(beta | eta) - log q(beta | lambda)] @@ -372,7 +373,6 @@ class LocalLDAModel private[spark] ( */ private[spark] def getTopicDistributionMethod(sc: SparkContext): Vector => Vector = { val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.asBreeze.toDenseMatrix.t).t) - val expElogbetaBc = sc.broadcast(expElogbeta) val docConcentrationBrz = this.docConcentration.asBreeze val gammaShape = this.gammaShape val k = this.k @@ -383,7 +383,7 @@ class LocalLDAModel private[spark] ( } else { val (gamma, _, _) = OnlineLDAOptimizer.variationalTopicInference( termCounts, - expElogbetaBc.value, + expElogbeta, docConcentrationBrz, gammaShape, k) http://git-wip-us.apache.org/repos/asf/spark/blob/98b5ccd3/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index 07a67a9..593cdd6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -246,6 +246,7 @@ object GradientDescent extends Logging { // c: (grad, loss, count) (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3) }) + bcWeights.destroy(blocking = false) if (miniBatchSize > 0) { /** --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
