[ 
https://issues.apache.org/jira/browse/SPARK-22150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16522015#comment-16522015
 ] 

Sergey Zhemzhitsky commented on SPARK-22150:
--------------------------------------------

Just a kind remainder...

> PeriodicCheckpointer fails with FileNotFoundException in case of dependant 
> RDDs
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-22150
>                 URL: https://issues.apache.org/jira/browse/SPARK-22150
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 
> 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.2.0
>         Environment: spark 2.2.0
> scala 2.11
>            Reporter: Sergey Zhemzhitsky
>            Priority: Major
>
> PeriodicCheckpointer fails with FileNotFoundException in case of 
> checkpointing dependant RDDs (consider iterative algorithms), i.e. when the 
> RDD to checkpoint depends on already checkpointed RDD.
> Here is the exception
> {code}
> Job aborted due to stage failure: Task creation failed: 
> java.io.FileNotFoundException: File 
> file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-00000
>  does not exist
> java.io.FileNotFoundException: File 
> file:/tmp/spark-e2046e49-5d8a-4633-b525-52a00bab32f0/17d9b542-d48d-4f23-b368-3957a42644a6/rdd-20/part-00000
>  does not exist
>       at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:539)
>       at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:752)
>       at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:529)
>       at 
> org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
>       at 
> org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:274)
>       at scala.Option.map(Option.scala:146)
>       at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274)
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1697)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1708)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1707)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1707)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1705)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1705)
>       at 
> org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1671)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:989)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:987)
> {code}
> The issue seems to be in this [piece of 
> code|https://github.com/apache/spark/blob/0a7f5f2798b6e8b2ba15e8b3aa07d5953ad1c695/core/src/main/scala/org/apache/spark/util/PeriodicCheckpointer.scala#L94]
> {code:java}
> if (checkpointInterval != -1 && (updateCount % checkpointInterval) == 0
>   && sc.getCheckpointDir.nonEmpty) {
>   // Add new checkpoint before removing old checkpoints.
>   checkpoint(newData)
>   checkpointQueue.enqueue(newData)
>   // Remove checkpoints before the latest one.
>   var canDelete = true
>   while (checkpointQueue.size > 1 && canDelete) {
>     // Delete the oldest checkpoint only if the next checkpoint exists.
>     if (isCheckpointed(checkpointQueue.head)) {
>       removeCheckpointFile()
>     } else {
>       canDelete = false
>     }
>   }
> }
> {code}
> Given that _checkpointQueue.head_ is checkpointed and materialized and 
> _newData_ depends on _checkpointQueue.head_, then the exception happens on 
> action of RDDs representing _newData_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to