[ https://issues.apache.org/jira/browse/SPARK-29844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dong Wang updated SPARK-29844: ------------------------------ Summary: Improper unpersist strategy in ml.recommendation.ASL.train (was: Wrong unpersist strategy in ml.recommendation.ASL.train) > Improper unpersist strategy in ml.recommendation.ASL.train > ---------------------------------------------------------- > > Key: SPARK-29844 > URL: https://issues.apache.org/jira/browse/SPARK-29844 > Project: Spark > Issue Type: Improvement > Components: ML > Affects Versions: 3.0.0 > Reporter: Dong Wang > Priority: Major > > In ml.recommendation.ASL.train(), there are many intermediate RDDs. At the > end of the method, these RDDs invoke unpersist(), but the timings of > unpersist is not right, which will cause recomputation and memory waste. > {code:scala} > val userIdAndFactors = userInBlocks > .mapValues(_.srcIds) > .join(userFactors) > .mapPartitions({ items => > items.flatMap { case (_, (ids, factors)) => > ids.view.zip(factors) > } > // Preserve the partitioning because IDs are consistent with the > partitioners in userInBlocks > // and userFactors. > }, preservesPartitioning = true) > .setName("userFactors") > .persist(finalRDDStorageLevel) // Missing unpersist, but hard to fix > val itemIdAndFactors = itemInBlocks > .mapValues(_.srcIds) > .join(itemFactors) > .mapPartitions({ items => > items.flatMap { case (_, (ids, factors)) => > ids.view.zip(factors) > } > }, preservesPartitioning = true) > .setName("itemFactors") > .persist(finalRDDStorageLevel) // Missing unpersist, but hard to fix > if (finalRDDStorageLevel != StorageLevel.NONE) { > userIdAndFactors.count() > itemFactors.unpersist() // Premature unpersist > itemIdAndFactors.count() > userInBlocks.unpersist() // Lagging unpersist > userOutBlocks.unpersist() // Lagging unpersist > itemInBlocks.unpersist() > itemOutBlocks.unpersist() // Lagging unpersist > blockRatings.unpersist() // Lagging unpersist > } > (userIdAndFactors, itemIdAndFactors) > } > {code} > 1. Unpersist itemFactors too early. itemIdAndFactors.count() will use > itemFactors. So itemFactors will be recomputed. > 2. Unpersist userInBlocks, userOutBlocks, itemOutBlocks, and blockRatings too > late. The final action - itemIdAndFactors.count() will not use these RDDs, so > these RDDs can be unpersisted before it to save memory. > By the way, itemIdAndFactors is persisted here but will never be unpersisted > util the application ends. It may hurts the performance, but I think it's > hard to fix. > This issue is reported by our tool CacheCheck, which is used to dynamically > detecting persist()/unpersist() api misuses. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org