Repository: spark Updated Branches: refs/heads/master 538f22162 -> 72df5a301
SPARK-5148 [MLlib] Make usersOut/productsOut storagelevel in ALS configurable Author: Fernando Otero (ZeoS) <[email protected]> Closes #3953 from zeitos/storageLevel and squashes the following commits: 0f070b9 [Fernando Otero (ZeoS)] fix imports 6869e80 [Fernando Otero (ZeoS)] fix comment length 90c9f7e [Fernando Otero (ZeoS)] fix comment length 18a992e [Fernando Otero (ZeoS)] changing storage level Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72df5a30 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72df5a30 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72df5a30 Branch: refs/heads/master Commit: 72df5a301e706d9384f3a1c17b2c58b017632b1f Parents: 538f221 Author: Fernando Otero (ZeoS) <[email protected]> Authored: Thu Jan 8 12:42:54 2015 -0800 Committer: Xiangrui Meng <[email protected]> Committed: Thu Jan 8 12:42:54 2015 -0800 ---------------------------------------------------------------------- .../apache/spark/mllib/recommendation/ALS.scala | 18 +++++++++++-- .../spark/mllib/recommendation/ALSSuite.scala | 27 ++++++++++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/72df5a30/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 90ac252..bee951a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -116,6 +116,7 @@ class ALS private ( /** storage level for user/product in/out links */ private var intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK + private var finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK /** * Set the number of blocks for both user blocks and product blocks to parallelize the computation @@ -205,6 +206,19 @@ class ALS private ( } /** + * :: DeveloperApi :: + * Sets storage level for final RDDs (user/product used in MatrixFactorizationModel). The default + * value is `MEMORY_AND_DISK`. Users can change it to a serialized storage, e.g. + * `MEMORY_AND_DISK_SER` and set `spark.rdd.compress` to `true` to reduce the space requirement, + * at the cost of speed. + */ + @DeveloperApi + def setFinalRDDStorageLevel(storageLevel: StorageLevel): this.type = { + this.finalRDDStorageLevel = storageLevel + this + } + + /** * Run ALS with the configured parameters on an input RDD of (user, product, rating) triples. * Returns a MatrixFactorizationModel with feature vectors for each user and product. */ @@ -307,8 +321,8 @@ class ALS private ( val usersOut = unblockFactors(users, userOutLinks) val productsOut = unblockFactors(products, productOutLinks) - usersOut.setName("usersOut").persist(StorageLevel.MEMORY_AND_DISK) - productsOut.setName("productsOut").persist(StorageLevel.MEMORY_AND_DISK) + usersOut.setName("usersOut").persist(finalRDDStorageLevel) + productsOut.setName("productsOut").persist(finalRDDStorageLevel) // Materialize usersOut and productsOut. usersOut.count() http://git-wip-us.apache.org/repos/asf/spark/blob/72df5a30/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 603d0ad..f3b7bfd 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -27,6 +27,7 @@ import org.jblas.DoubleMatrix import org.apache.spark.SparkContext._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.recommendation.ALS.BlockStats +import org.apache.spark.storage.StorageLevel object ALSSuite { @@ -139,6 +140,32 @@ class ALSSuite extends FunSuite with MLlibTestSparkContext { assert(u11 != u2) } + test("Storage Level for RDDs in model") { + val ratings = sc.parallelize(ALSSuite.generateRatings(10, 20, 5, 0.5, false, false)._1, 2) + var storageLevel = StorageLevel.MEMORY_ONLY + var model = new ALS() + .setRank(5) + .setIterations(1) + .setLambda(1.0) + .setBlocks(2) + .setSeed(1) + .setFinalRDDStorageLevel(storageLevel) + .run(ratings) + assert(model.productFeatures.getStorageLevel == storageLevel); + assert(model.userFeatures.getStorageLevel == storageLevel); + storageLevel = StorageLevel.DISK_ONLY + model = new ALS() + .setRank(5) + .setIterations(1) + .setLambda(1.0) + .setBlocks(2) + .setSeed(1) + .setFinalRDDStorageLevel(storageLevel) + .run(ratings) + assert(model.productFeatures.getStorageLevel == storageLevel); + assert(model.userFeatures.getStorageLevel == storageLevel); + } + test("negative ids") { val data = ALSSuite.generateRatings(50, 50, 2, 0.7, false, false) val ratings = sc.parallelize(data._1.map { case Rating(u, p, r) => --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
