[ https://issues.apache.org/jira/browse/SPARK-13048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15127179#comment-15127179 ]
holdenk edited comment on SPARK-13048 at 2/1/16 10:43 PM: ---------------------------------------------------------- This sounds useful, although we probably wouldn't want to always leave the last checkpoint around (and we also need to provide a way for the user to cleanup the last check point). We could make a getCheckPointedLDAModel or offer a param to the current method and then add a cleanup function to the resulting LDAModel for the user to call. Any thoughts [~josephkb] or [~mengxr]? Also an interesting question would be how to expose something similar in the pipelines API. was (Author: holdenk): This sounds useful, although we probably wouldn't want to always leave the last checkpoint around (and we also need to provide a way for the user to cleanup the last check point). We could make a getCheckPointedLDAModel or offer a param to the current method and then add a cleanup function to the resulting LDAModel for the user to call. Any thoughts [~josephkb] or [~mengxr]? > EMLDAOptimizer deletes dependent checkpoint of DistributedLDAModel > ------------------------------------------------------------------ > > Key: SPARK-13048 > URL: https://issues.apache.org/jira/browse/SPARK-13048 > Project: Spark > Issue Type: Bug > Components: MLlib > Affects Versions: 1.5.2 > Environment: Standalone Spark cluster > Reporter: Jeff Stein > > In EMLDAOptimizer, all checkpoints are deleted before returning the > DistributedLDAModel. > The most recent checkpoint is still necessary for operations on the > DistributedLDAModel under a couple scenarios: > - The graph doesn't fit in memory on the worker nodes (e.g. very large data > set). > - Late worker failures that require reading the now-dependent checkpoint. > I ran into this problem running a 10M record LDA model in a memory starved > environment. The model consistently failed in either the {{collect at > LDAModel.scala:528}} stage (when converting to a LocalLDAModel) or in the > {{reduce at LDAModel.scala:563}} stage (when calling "describeTopics" on the > model). In both cases, a FileNotFoundException is thrown attempting to access > a checkpoint file. > I'm not sure what the correct fix is here; it might involve a class signature > change. An alternative simple fix is to leave the last checkpoint around and > expect the user to clean the checkpoint directory themselves. > {noformat} > java.io.FileNotFoundException: File does not exist: > /hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071 > {noformat} > Relevant code is included below. > LDAOptimizer.scala: > {noformat} > override private[clustering] def getLDAModel(iterationTimes: > Array[Double]): LDAModel = { > require(graph != null, "graph is null, EMLDAOptimizer not initialized.") > this.graphCheckpointer.deleteAllCheckpoints() > // The constructor's default arguments assume gammaShape = 100 to ensure > equivalence in > // LDAModel.toLocal conversion > new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k, > this.vocabSize, > Vectors.dense(Array.fill(this.k)(this.docConcentration)), > this.topicConcentration, > iterationTimes) > } > {noformat} > PeriodicCheckpointer.scala > {noformat} > /** > * Call this at the end to delete any remaining checkpoint files. > */ > def deleteAllCheckpoints(): Unit = { > while (checkpointQueue.nonEmpty) { > removeCheckpointFile() > } > } > /** > * Dequeue the oldest checkpointed Dataset, and remove its checkpoint files. > * This prints a warning but does not fail if the files cannot be removed. > */ > private def removeCheckpointFile(): Unit = { > val old = checkpointQueue.dequeue() > // Since the old checkpoint is not deleted by Spark, we manually delete > it. > val fs = FileSystem.get(sc.hadoopConfiguration) > getCheckpointFiles(old).foreach { checkpointFile => > try { > fs.delete(new Path(checkpointFile), true) > } catch { > case e: Exception => > logWarning("PeriodicCheckpointer could not remove old checkpoint > file: " + > checkpointFile) > } > } > } > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org