[ 
https://issues.apache.org/jira/browse/SPARK-29844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Wang updated SPARK-29844:
------------------------------
    Summary: Improper unpersist strategy in ml.recommendation.ASL.train  (was: 
Wrong unpersist strategy in ml.recommendation.ASL.train)

> Improper unpersist strategy in ml.recommendation.ASL.train
> ----------------------------------------------------------
>
>                 Key: SPARK-29844
>                 URL: https://issues.apache.org/jira/browse/SPARK-29844
>             Project: Spark
>          Issue Type: Improvement
>          Components: ML
>    Affects Versions: 3.0.0
>            Reporter: Dong Wang
>            Priority: Major
>
> In ml.recommendation.ASL.train(), there are many intermediate RDDs. At the 
> end of the method, these RDDs invoke unpersist(), but the timings of 
> unpersist is not right, which will cause recomputation and memory waste.
> {code:scala}
>     val userIdAndFactors = userInBlocks
>       .mapValues(_.srcIds)
>       .join(userFactors)
>       .mapPartitions({ items =>
>         items.flatMap { case (_, (ids, factors)) =>
>           ids.view.zip(factors)
>         }
>       // Preserve the partitioning because IDs are consistent with the 
> partitioners in userInBlocks
>       // and userFactors.
>       }, preservesPartitioning = true)
>       .setName("userFactors")
>       .persist(finalRDDStorageLevel) // Missing unpersist, but hard to fix
>     val itemIdAndFactors = itemInBlocks
>       .mapValues(_.srcIds)
>       .join(itemFactors)
>       .mapPartitions({ items =>
>         items.flatMap { case (_, (ids, factors)) =>
>           ids.view.zip(factors)
>         }
>       }, preservesPartitioning = true)
>       .setName("itemFactors")
>       .persist(finalRDDStorageLevel) // Missing unpersist, but hard to fix
>     if (finalRDDStorageLevel != StorageLevel.NONE) {
>       userIdAndFactors.count()
>       itemFactors.unpersist() // Premature unpersist
>       itemIdAndFactors.count()
>       userInBlocks.unpersist() // Lagging unpersist
>       userOutBlocks.unpersist() // Lagging unpersist
>       itemInBlocks.unpersist() 
>       itemOutBlocks.unpersist() // Lagging unpersist
>       blockRatings.unpersist() // Lagging unpersist
>     }
>     (userIdAndFactors, itemIdAndFactors)
>   }
> {code}
> 1. Unpersist itemFactors too early. itemIdAndFactors.count() will use 
> itemFactors. So itemFactors will be recomputed.
> 2. Unpersist userInBlocks, userOutBlocks, itemOutBlocks, and blockRatings too 
> late. The final action - itemIdAndFactors.count() will not use these RDDs, so 
> these RDDs can be unpersisted before it to save memory.
> By the way, itemIdAndFactors is persisted here but will never be unpersisted 
> util the application ends. It may hurts the performance, but I think it's 
> hard to fix.
> This issue is reported by our tool CacheCheck, which is used to dynamically 
> detecting persist()/unpersist() api misuses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to