Repository: spark
Updated Branches:
  refs/heads/master 0dc4310b4 -> bce354c1d


[SPARK-16696][ML][MLLIB] destroy KMeans bcNewCenters when loop finished and 
update code where should release unused broadcast/RDD in proper time

## What changes were proposed in this pull request?

update unused broadcast in KMeans/Word2Vec,
use destroy(false) to release memory in time.

and several place destroy() update to destroy(false) so that it will be 
async-called,
it will better than blocking called.

and update bcNewCenters in KMeans to make it destroy in correct time.
I use a list to store all historical `bcNewCenters` generated in each loop 
iteration and delay them to release at the end of loop.

fix TODO in `BisectingKMeans.run` "unpersist old indices",
Implements the pattern "persist current step RDD, and unpersist previous one" 
in the loop iteration.

## How was this patch tested?

Existing tests.

Author: WeichenXu <[email protected]>

Closes #14333 from WeichenXu123/broadvar_unpersist_to_destroy.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bce354c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bce354c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bce354c1

Branch: refs/heads/master
Commit: bce354c1d4e2b97b1159913085e9883a26bc605a
Parents: 0dc4310
Author: WeichenXu <[email protected]>
Authored: Sat Jul 30 08:07:22 2016 -0700
Committer: Sean Owen <[email protected]>
Committed: Sat Jul 30 08:07:22 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/mllib/clustering/BisectingKMeans.scala   |  8 ++++++--
 .../scala/org/apache/spark/mllib/clustering/KMeans.scala  |  8 ++++++--
 .../scala/org/apache/spark/mllib/feature/Word2Vec.scala   | 10 +++++-----
 3 files changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bce354c1/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
index f1664ce..e6b8971 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
@@ -165,6 +165,8 @@ class BisectingKMeans private (
     val random = new Random(seed)
     var numLeafClustersNeeded = k - 1
     var level = 1
+    var preIndices: RDD[Long] = null
+    var indices: RDD[Long] = null
     while (activeClusters.nonEmpty && numLeafClustersNeeded > 0 && level < 
LEVEL_LIMIT) {
       // Divisible clusters are sufficiently large and have non-trivial cost.
       var divisibleClusters = activeClusters.filter { case (_, summary) =>
@@ -194,8 +196,9 @@ class BisectingKMeans private (
           newClusters = summarize(d, newAssignments)
           newClusterCenters = newClusters.mapValues(_.center).map(identity)
         }
-        // TODO: Unpersist old indices.
-        val indices = updateAssignments(assignments, divisibleIndices, 
newClusterCenters).keys
+        if (preIndices != null) preIndices.unpersist()
+        preIndices = indices
+        indices = updateAssignments(assignments, divisibleIndices, 
newClusterCenters).keys
           .persist(StorageLevel.MEMORY_AND_DISK)
         assignments = indices.zip(vectors)
         inactiveClusters ++= activeClusters
@@ -208,6 +211,7 @@ class BisectingKMeans private (
       }
       level += 1
     }
+    if(indices != null) indices.unpersist()
     val clusters = activeClusters ++ inactiveClusters
     val root = buildTree(clusters)
     new BisectingKMeansModel(root)

http://git-wip-us.apache.org/repos/asf/spark/blob/bce354c1/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index 9a3d64f..de9fa4a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -20,6 +20,7 @@ package org.apache.spark.mllib.clustering
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.annotation.Since
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
 import org.apache.spark.ml.clustering.{KMeans => NewKMeans}
 import org.apache.spark.ml.util.Instrumentation
@@ -309,7 +310,7 @@ class KMeans private (
         contribs.iterator
       }.reduceByKey(mergeContribs).collectAsMap()
 
-      bcActiveCenters.unpersist(blocking = false)
+      bcActiveCenters.destroy(blocking = false)
 
       // Update the cluster centers and costs for each active run
       for ((run, i) <- activeRuns.zipWithIndex) {
@@ -402,8 +403,10 @@ class KMeans private (
     // to their squared distance from that run's centers. Note that only 
distances between points
     // and new centers are computed in each iteration.
     var step = 0
+    var bcNewCentersList = ArrayBuffer[Broadcast[_]]()
     while (step < initializationSteps) {
       val bcNewCenters = data.context.broadcast(newCenters)
+      bcNewCentersList += bcNewCenters
       val preCosts = costs
       costs = data.zip(preCosts).map { case (point, cost) =>
           Array.tabulate(runs) { r =>
@@ -453,6 +456,7 @@ class KMeans private (
 
     mergeNewCenters()
     costs.unpersist(blocking = false)
+    bcNewCentersList.foreach(_.destroy(false))
 
     // Finally, we might have a set of more than k candidate centers for each 
run; weigh each
     // candidate by the number of points in the dataset mapping to it and run 
a local k-means++
@@ -464,7 +468,7 @@ class KMeans private (
       }
     }.reduceByKey(_ + _).collectAsMap()
 
-    bcCenters.unpersist(blocking = false)
+    bcCenters.destroy(blocking = false)
 
     val finalCenters = (0 until runs).par.map { r =>
       val myCenters = centers(r).toArray

http://git-wip-us.apache.org/repos/asf/spark/blob/bce354c1/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index bc75646..9081987 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -430,13 +430,13 @@ class Word2Vec extends Serializable with Logging {
         }
         i += 1
       }
-      bcSyn0Global.unpersist(false)
-      bcSyn1Global.unpersist(false)
+      bcSyn0Global.destroy(false)
+      bcSyn1Global.destroy(false)
     }
     newSentences.unpersist()
-    expTable.destroy()
-    bcVocab.destroy()
-    bcVocabHash.destroy()
+    expTable.destroy(false)
+    bcVocab.destroy(false)
+    bcVocabHash.destroy(false)
 
     val wordArray = vocab.map(_.word)
     new Word2VecModel(wordArray.zipWithIndex.toMap, syn0Global)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to