[ 
https://issues.apache.org/jira/browse/SPARK-13048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15127179#comment-15127179
 ] 

holdenk edited comment on SPARK-13048 at 2/1/16 10:43 PM:
----------------------------------------------------------

This sounds useful, although we probably wouldn't want to always leave the last 
checkpoint around (and we also need to provide a way for the user to cleanup 
the last check point).

We could make a getCheckPointedLDAModel or offer a param to the current method 
and then add a cleanup function to the resulting LDAModel for the user to call. 
Any thoughts [~josephkb] or [~mengxr]? Also an interesting question would be 
how to expose something similar in the pipelines API.


was (Author: holdenk):
This sounds useful, although we probably wouldn't want to always leave the last 
checkpoint around (and we also need to provide a way for the user to cleanup 
the last check point).

We could make a getCheckPointedLDAModel or offer a param to the current method 
and then add a cleanup function to the resulting LDAModel for the user to call. 
Any thoughts [~josephkb] or [~mengxr]?

> EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel
> ------------------------------------------------------------------
>
>                 Key: SPARK-13048
>                 URL: https://issues.apache.org/jira/browse/SPARK-13048
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib
>    Affects Versions: 1.5.2
>         Environment: Standalone Spark cluster
>            Reporter: Jeff Stein
>
> In EMLDAOptimizer, all checkpoints are deleted before returning the 
> DistributedLDAModel.
> The most recent checkpoint is still necessary for operations on the 
> DistributedLDAModel under a couple scenarios:
> - The graph doesn't fit in memory on the worker nodes (e.g. very large data 
> set).
> - Late worker failures that require reading the now-dependent checkpoint.
> I ran into this problem running a 10M record LDA model in a memory starved 
> environment. The model consistently failed in either the {{collect at 
> LDAModel.scala:528}} stage (when converting to a LocalLDAModel) or in the 
> {{reduce at LDAModel.scala:563}} stage (when calling "describeTopics" on the 
> model). In both cases, a FileNotFoundException is thrown attempting to access 
> a checkpoint file.
> I'm not sure what the correct fix is here; it might involve a class signature 
> change. An alternative simple fix is to leave the last checkpoint around and 
> expect the user to clean the checkpoint directory themselves.
> {noformat}
> java.io.FileNotFoundException: File does not exist: 
> /hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071
> {noformat}
> Relevant code is included below.
> LDAOptimizer.scala:
> {noformat}
>   override private[clustering] def getLDAModel(iterationTimes: 
> Array[Double]): LDAModel = {
>     require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
>     this.graphCheckpointer.deleteAllCheckpoints()
>     // The constructor's default arguments assume gammaShape = 100 to ensure 
> equivalence in
>     // LDAModel.toLocal conversion
>     new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, 
> this.vocabSize,
>       Vectors.dense(Array.fill(this.k)(this.docConcentration)), 
> this.topicConcentration,
>       iterationTimes)
>   }
> {noformat}
> PeriodicCheckpointer.scala
> {noformat}
>   /**
>    * Call this at the end to delete any remaining checkpoint files.
>    */
>   def deleteAllCheckpoints(): Unit = {
>     while (checkpointQueue.nonEmpty) {
>       removeCheckpointFile()
>     }
>   }
>   /**
>    * Dequeue the oldest checkpointed Dataset, and remove its checkpoint files.
>    * This prints a warning but does not fail if the files cannot be removed.
>    */
>   private def removeCheckpointFile(): Unit = {
>     val old = checkpointQueue.dequeue()
>     // Since the old checkpoint is not deleted by Spark, we manually delete 
> it.
>     val fs = FileSystem.get(sc.hadoopConfiguration)
>     getCheckpointFiles(old).foreach { checkpointFile =>
>       try {
>         fs.delete(new Path(checkpointFile), true)
>       } catch {
>         case e: Exception =>
>           logWarning("PeriodicCheckpointer could not remove old checkpoint 
> file: " +
>             checkpointFile)
>       }
>     }
>   }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to