[ https://issues.apache.org/jira/browse/SPARK-22150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522015#comment-16522015 ]
Sergey Zhemzhitsky commented on SPARK-22150: -------------------------------------------- Just a kind remainder... > PeriodicCheckpointer fails with FileNotFoundException in case of dependant > RDDs > ------------------------------------------------------------------------------- > > Key: SPARK-22150 > URL: https://issues.apache.org/jira/browse/SPARK-22150 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, > 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.2.0 > Environment: spark 2.2.0 > scala 2.11 > Reporter: Sergey Zhemzhitsky > Priority: Major > > PeriodicCheckpointer fails with FileNotFoundException in case of > checkpointing dependant RDDs (consider iterative algorithms), i.e. when the > RDD to checkpoint depends on already checkpointed RDD. > Here is the exception > {code} > Job aborted due to stage failure: Task creation failed: > java.io.FileNotFoundException: File > file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-00000 > does not exist > java.io.FileNotFoundException: File > file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-00000 > does not exist > at > org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752) > at > org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529) > at > org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409) > at > org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274) > at scala.Option.map(Option.scala:146) > at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1708) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1707) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1705) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1705) > at > org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1671) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:989) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:987) > {code} > The issue seems to be in this [piece of > code|https://github.com/apache/spark/blob/0a7f5f2798b6e8b2ba15e8b3aa07d5953ad1c695/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala#L94] > {code:java} > if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0 > && sc.getCheckpointDir.nonEmpty) { > // Add new checkpoint before removing old checkpoints. > checkpoint(newData) > checkpointQueue.enqueue(newData) > // Remove checkpoints before the latest one. > var canDelete = true > while (checkpointQueue.size > 1 && canDelete) { > // Delete the oldest checkpoint only if the next checkpoint exists. > if (isCheckpointed(checkpointQueue.head)) { > removeCheckpointFile() > } else { > canDelete = false > } > } > } > {code} > Given that _checkpointQueue.head_ is checkpointed and materialized and > _newData_ depends on _checkpointQueue.head_, then the exception happens on > action of RDDs representing _newData_ -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org