Nan Zhu created SPARK-19278:
-------------------------------

             Summary: Failed Recovery from checkpoint caused by the 
multi-threads issue in Spark Streaming scheduler
                 Key: SPARK-19278
                 URL: https://issues.apache.org/jira/browse/SPARK-19278
             Project: Spark
          Issue Type: Bug
    Affects Versions: 2.1.0, 2.0.2, 2.0.1, 2.0.0, 1.6.3
            Reporter: Nan Zhu


In one of our applications, we found the following issue, the application 
recovering from a checkpoint file named "checkpoint-***166700000" but with the 
timestamp ***166500000 will recover from the very beginning of the stream and 
because our application relies on the external & periodically-cleaned data 
(syncing with checkpoint cleanup), the recovery just failed

We identified a potential issue in Spark Streaming checkpoint and will describe 
it with the following example. We will propose a fix in the end of this JIRA.

1. The application properties: Batch Duration: 20000, Functionality: Single 
Stream calling ReduceByKeyAndWindow and print, Window Size: 60000, 
SlideDuration, 20000

2. RDD at 166500000 is generated and the corresponding job is submitted to the 
execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the queue of 
JobGenerator

3. Job at 166500000 is finished and JobCompleted message is sent to 
JobScheduler's queue, and meanwhile, Job at 166520000 is submitted to the 
execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of 
JobGenerator

4. JobScheduler's message processing thread (I will use JS-EventLoop to 
identify it) is not scheduled by the operating system for a long time, and 
during this period, Jobs generated from 166520000 - 166700000 are generated and 
completed.

5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled 
and processed all DoCheckpoint messages for jobs ranging from 166520000 - 
166700000 and checkpoint files are successfully written. CRITICAL: at this 
moment, the lastCheckpointTime would be 166700000.

6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs 
ranging from 166520000 - 166700000. CRITICAL: a ClearMetadata message is pushed 
to JobGenerator's message queue for EACH JobCompleted.

7. The current message queue contains 20 ClearMetadata messages and 
JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will remove 
all RDDs out of rememberDuration window. In our case, ReduceyKeyAndWindow will 
set rememberDuration to 100000 (rememberDuration of ReducedWindowDStream 
(40000) + windowSize) resulting that only RDDs <- (166600000, 166700000] are 
kept. And ClearMetaData processing logic will push a DoCheckpoint to 
JobGenerator's thread

8. JG-EventLoop is scheduled again to process DoCheckpoint for 16650000, VERY 
CRITICAL: at this step, RDD no later than 166700000 has been removed, and 
checkpoint data is updated as  
https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53
 and 
https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59.

9. After 8, a checkpoint named /path/checkpoint-166700000 is created but with 
the timestamp 166500000. and at this moment, Application crashed

10. Application recovers from /path/checkpoint-166700000 and try to get RDD 
with validTime 166500000. Of course it will not find it and has to recompute. 
In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until 
to the start of the stream. When the stream depends on the external data, it 
will not successfully recover. In the case of Kafka, the recovered RDDs would 
not be the same as the original one, as the currentOffsets has been updated to 
the value at the moment of 166700000


The proposed fix:

0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime 
instead of using the timestamp of Checkpoint instance (any side-effect?)
1. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see 
any necessary to have two threads here




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to