[ 
https://issues.apache.org/jira/browse/SPARK-17885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cosmin Ciobanu updated SPARK-17885:
-----------------------------------
    Description: 
The issue is that the Spark driver checkpoints an RDD, deletes it, the job 
restarts, and the new driver tries to load the deleted checkpoint RDD.

The application is run in YARN, which attempts to restart the application a 
number of times (100 in our case), all of which fail due to missing the deleted 
RDD. 

Here is a Splunk log which shows the inconsistency in checkpoint behaviour:

*2016-10-09 02:48:43,533* [streaming-job-executor-0] INFO  
org.apache.spark.rdd.ReliableRDDCheckpointData - Done checkpointing RDD 73847 
to hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*, 
new parent is RDD 73872
host = ip-10-1-1-13.ec2.internal
*2016-10-09 02:53:14,696* [JobGenerator] INFO  
org.apache.spark.streaming.dstream.DStreamCheckpointData - Deleted checkpoint 
file 
'hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*' 
for time 1475981310000 ms
host = ip-10-1-1-13.ec2.internal
*Job restarts here, notice driver host change from ip-10-1-1-13.ec2.internal to 
ip-10-1-1-25.ec2.internal.*
*2016-10-09 02:53:30,175* [Driver] INFO  
org.apache.spark.streaming.dstream.DStreamCheckpointData - Restoring 
checkpointed RDD for time 1475981310000 ms from file 
'hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*'
host = ip-10-1-1-25.ec2.internal
*2016-10-09 02:53:30,491* [Driver] ERROR 
org.apache.spark.deploy.yarn.ApplicationMaster - User class threw exception: 
java.lang.IllegalArgumentException: requirement failed: Checkpoint directory 
does not exist: 
hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*
java.lang.IllegalArgumentException: requirement failed: Checkpoint directory 
does not exist: 
hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*
host = ip-10-1-1-25.ec2.internal

Spark streaming is configured with a microbatch interval of 30 seconds, 
checkpoint interval of 120 seconds, and cleaner.ttl of 28800 (8 hours), but as 
far as I can tell, this TTL only affects metadata cleanup interval. RDDs seem 
to be deleted every 4-5 minutes after being checkpointed.

Running on top of Spark 1.5.1.

There are at least two possible issues here:
- In case of a driver restart the new driver tries to load checkpointed RDDs 
which the previous driver had just deleted;
- Spark loads stale checkpointed data - the logs show that the deleted RDD was 
initially checkpointed 4 minutes and 31 seconds before deletion, and 4 minutes 
and 47 seconds before the new driver tries to load it. Given the fact the 
checkpointing interval is 120 seconds, it makes no sense to load data older 
than that.

P.S. Looking at the source code with the event loop that handles checkpoint 
updates and cleanup, nothing seems to have changed in more recent versions of 
Spark, so the bug is likely present in 2.0.1 as well.
P.P.S. The issue is difficult to reproduce - it only occurs once in every 10 or 
so restarts, and only in clusters with high-load.

  was:
The issue is that the Spark driver checkpoints an RDD, deletes it, the job 
restarts, and the new driver tries to load the deleted checkpoint RDD.

The application is run in YARN, which attempts to restart the application a 
number of times (100 in our case), all of which fail due to missing the deleted 
RDD. 

Here is a Splunk log which shows the inconsistency in checkpoint behaviour:

*2016-10-09 02:48:43,533* [streaming-job-executor-0] INFO  
org.apache.spark.rdd.ReliableRDDCheckpointData - Done checkpointing RDD 73847 
to hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*, 
new parent is RDD 73872
host = ip-10-1-1-13.ec2.internal
*2016-10-09 02:53:14,696* [JobGenerator] INFO  
org.apache.spark.streaming.dstream.DStreamCheckpointData - Deleted checkpoint 
file 
'hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*' 
for time 1475981310000 ms
host = ip-10-1-1-13.ec2.internal
*Job restarts here, notice driver host change from ip-10-1-1-13.ec2.internal to 
ip-10-1-1-25.ec2.internal.*
*2016-10-09 02:53:30,175* [Driver] INFO  
org.apache.spark.streaming.dstream.DStreamCheckpointData - Restoring 
checkpointed RDD for time 1475981310000 ms from file 
'hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*'
host = ip-10-1-1-25.ec2.internal
*2016-10-09 02:53:30,491* [Driver] ERROR 
org.apache.spark.deploy.yarn.ApplicationMaster - User class threw exception: 
java.lang.IllegalArgumentException: requirement failed: Checkpoint directory 
does not exist: 
hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*
java.lang.IllegalArgumentException: requirement failed: Checkpoint directory 
does not exist: 
hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*
host = ip-10-1-1-25.ec2.internal

Spark streaming is configured with a microbatch interval of 30 seconds, 
checkpoint interval of 120 seconds, and cleaner.ttl of 28800 (8 hours), but as 
far as I can tell, this TTL only affects metadata cleanup interval. RDDs seem 
to be deleted every 4-5 minutes after being checkpointed.

Running on top of Spark 1.5.1.

There are at least two possible issues here:
- In case of a driver restart the new driver tries to load checkpointed RDDs 
which the previous driver had just deleted;
- Spark loads stale checkpointed data - the logs show that the deleted RDD was 
initially checkpointed 4 minutes and 31 seconds before deletion, and 4 minutes 
and 47 seconds before the new driver tries to load it. Given the fact the 
checkpointing interval is 120 seconds, it makes no sense to load data older 
than that.

P.S. Looking at the source code with the event loop that handles checkpoint 
updates and cleanup, nothing seems to have changed in more recent versions of 
Spark, so the bug is likely present in 2.0.1 as well.


> Spark Streaming deletes checkpointed RDD then tries to load it after restart
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-17885
>                 URL: https://issues.apache.org/jira/browse/SPARK-17885
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.5.1
>            Reporter: Cosmin Ciobanu
>
> The issue is that the Spark driver checkpoints an RDD, deletes it, the job 
> restarts, and the new driver tries to load the deleted checkpoint RDD.
> The application is run in YARN, which attempts to restart the application a 
> number of times (100 in our case), all of which fail due to missing the 
> deleted RDD. 
> Here is a Splunk log which shows the inconsistency in checkpoint behaviour:
> *2016-10-09 02:48:43,533* [streaming-job-executor-0] INFO  
> org.apache.spark.rdd.ReliableRDDCheckpointData - Done checkpointing RDD 73847 
> to 
> hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*, 
> new parent is RDD 73872
> host = ip-10-1-1-13.ec2.internal
> *2016-10-09 02:53:14,696* [JobGenerator] INFO  
> org.apache.spark.streaming.dstream.DStreamCheckpointData - Deleted checkpoint 
> file 
> 'hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*' 
> for time 1475981310000 ms
> host = ip-10-1-1-13.ec2.internal
> *Job restarts here, notice driver host change from ip-10-1-1-13.ec2.internal 
> to ip-10-1-1-25.ec2.internal.*
> *2016-10-09 02:53:30,175* [Driver] INFO  
> org.apache.spark.streaming.dstream.DStreamCheckpointData - Restoring 
> checkpointed RDD for time 1475981310000 ms from file 
> 'hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*'
> host = ip-10-1-1-25.ec2.internal
> *2016-10-09 02:53:30,491* [Driver] ERROR 
> org.apache.spark.deploy.yarn.ApplicationMaster - User class threw exception: 
> java.lang.IllegalArgumentException: requirement failed: Checkpoint directory 
> does not exist: 
> hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*
> java.lang.IllegalArgumentException: requirement failed: Checkpoint directory 
> does not exist: 
> hdfs://proc-job/checkpoint/cadf8dcf-ebc2-4366-a2e1-0939976c6ce1/*rdd-73847*
> host = ip-10-1-1-25.ec2.internal
> Spark streaming is configured with a microbatch interval of 30 seconds, 
> checkpoint interval of 120 seconds, and cleaner.ttl of 28800 (8 hours), but 
> as far as I can tell, this TTL only affects metadata cleanup interval. RDDs 
> seem to be deleted every 4-5 minutes after being checkpointed.
> Running on top of Spark 1.5.1.
> There are at least two possible issues here:
> - In case of a driver restart the new driver tries to load checkpointed RDDs 
> which the previous driver had just deleted;
> - Spark loads stale checkpointed data - the logs show that the deleted RDD 
> was initially checkpointed 4 minutes and 31 seconds before deletion, and 4 
> minutes and 47 seconds before the new driver tries to load it. Given the fact 
> the checkpointing interval is 120 seconds, it makes no sense to load data 
> older than that.
> P.S. Looking at the source code with the event loop that handles checkpoint 
> updates and cleanup, nothing seems to have changed in more recent versions of 
> Spark, so the bug is likely present in 2.0.1 as well.
> P.P.S. The issue is difficult to reproduce - it only occurs once in every 10 
> or so restarts, and only in clusters with high-load.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to