Jeff Stein created SPARK-13048:
----------------------------------
Summary: EMLDAOptimizer deletes dependent checkpoint of
DistributedLDAModel
Key: SPARK-13048
URL: https://issues.apache.org/jira/browse/SPARK-13048
Project: Spark
Issue Type: Bug
Components: MLlib
Affects Versions: 1.5.2
Environment: Standalone Spark cluster
Reporter: Jeff Stein
In EMLDAOptimizer, all checkpoints are deleted before returning the
DistributedLDAModel.
The most recent checkpoint is still necessary for operations on the
DistributedLDAModel under a couple scenarios:
- The graph doesn't fit in memory on the worker nodes (e.g. very large data set)
- Late worker failures that require reading the now-dependent checkpoint.
I ran into this problem running a 10M record LDA model in a memory starved
environment. The model persistently failed in either the {{collect at
LDAModel.scala:528}} stage when converting to a LocalLDAModel or in the
{{reduce at LDAModel.scala:563}} stage when calling "describeTopics". In both
cases, a FileNotFoundException is thrown attempting to access a checkpoint file.
I'm not sure what the correct fix is here; it might involve a class signature
change. An alternative simple fix is to leave the last checkpoint around and
expect the user to clean the checkpoint directory themselves.
{noformat}
java.io.FileNotFoundException: File does not exist:
/hdfs/path/to/checkpoints/c8bd2b4e-27dd-47b3-84ec-3ff0bac04587/rdd-635/part-00071
{noformat}
Relevant code is included below.
LDAOptimizer.scala:
{noformat}
override private[clustering] def getLDAModel(iterationTimes: Array[Double]):
LDAModel = {
require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
this.graphCheckpointer.deleteAllCheckpoints()
// The constructor's default arguments assume gammaShape = 100 to ensure
equivalence in
// LDAModel.toLocal conversion
new DistributedLDAModel(this.graph, this.globalTopicTotals, this.k,
this.vocabSize,
Vectors.dense(Array.fill(this.k)(this.docConcentration)),
this.topicConcentration,
iterationTimes)
}
{noformat}
PeriodicCheckpointer.scala
{noformat}
/**
* Call this at the end to delete any remaining checkpoint files.
*/
def deleteAllCheckpoints(): Unit = {
while (checkpointQueue.nonEmpty) {
removeCheckpointFile()
}
}
/**
* Dequeue the oldest checkpointed Dataset, and remove its checkpoint files.
* This prints a warning but does not fail if the files cannot be removed.
*/
private def removeCheckpointFile(): Unit = {
val old = checkpointQueue.dequeue()
// Since the old checkpoint is not deleted by Spark, we manually delete it.
val fs = FileSystem.get(sc.hadoopConfiguration)
getCheckpointFiles(old).foreach { checkpointFile =>
try {
fs.delete(new Path(checkpointFile), true)
} catch {
case e: Exception =>
logWarning("PeriodicCheckpointer could not remove old checkpoint
file: " +
checkpointFile)
}
}
}
{noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]