Dong Wang created SPARK-29844:
---------------------------------

             Summary: Wrong unpersist strategy in ml.recommendation.ASL.train
                 Key: SPARK-29844
                 URL: https://issues.apache.org/jira/browse/SPARK-29844
             Project: Spark
          Issue Type: Improvement
          Components: ML
    Affects Versions: 3.0.0
            Reporter: Dong Wang


In ml.recommendation.ASL.train(), there are many intermediate RDDs. At the end 
of the method, these RDDs invoke unpersist(), but the timings of unpersist is 
not right, which will cause recomputation and memory waste.
{code:scala}
    val userIdAndFactors = userInBlocks
      .mapValues(_.srcIds)
      .join(userFactors)
      .mapPartitions({ items =>
        items.flatMap { case (_, (ids, factors)) =>
          ids.view.zip(factors)
        }
      // Preserve the partitioning because IDs are consistent with the 
partitioners in userInBlocks
      // and userFactors.
      }, preservesPartitioning = true)
      .setName("userFactors")
      .persist(finalRDDStorageLevel) // Missing unpersist, but hard to fix
    val itemIdAndFactors = itemInBlocks
      .mapValues(_.srcIds)
      .join(itemFactors)
      .mapPartitions({ items =>
        items.flatMap { case (_, (ids, factors)) =>
          ids.view.zip(factors)
        }
      }, preservesPartitioning = true)
      .setName("itemFactors")
      .persist(finalRDDStorageLevel) // Missing unpersist, but hard to fix
    if (finalRDDStorageLevel != StorageLevel.NONE) {
      userIdAndFactors.count()
      itemFactors.unpersist() // Premature unpersist
      itemIdAndFactors.count()
      userInBlocks.unpersist() // Lagging unpersist
      userOutBlocks.unpersist() // Lagging unpersist
      itemInBlocks.unpersist() 
      itemOutBlocks.unpersist() // Lagging unpersist
      blockRatings.unpersist() // Lagging unpersist
    }
    (userIdAndFactors, itemIdAndFactors)
  }
{code}

1. Unpersist itemFactors too early. itemIdAndFactors.count() will use 
itemFactors. So itemFactors will be recomputed.
2. Unpersist userInBlocks, userOutBlocks, itemOutBlocks, and blockRatings too 
late. The final action - itemIdAndFactors.count() will not use these RDDs, so 
these RDDs can be unpersisted before it to save memory.
By the way, itemIdAndFactors is persisted here but will never be unpersisted 
util the application ends. It may hurts the performance, but I think it's hard 
to fix.

This issue is reported by our tool CacheCheck, which is used to dynamically 
detecting persist()/unpersist() api misuses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to