[
https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon updated SPARK-19280:
---------------------------------
Priority: Major (was: Critical)
> Failed Recovery from checkpoint caused by the multi-threads issue in Spark
> Streaming scheduler
> ----------------------------------------------------------------------------------------------
>
> Key: SPARK-19280
> URL: https://issues.apache.org/jira/browse/SPARK-19280
> Project: Spark
> Issue Type: Bug
> Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
> Reporter: Nan Zhu
> Priority: Major
>
> In one of our applications, we found the following issue, the application
> recovering from a checkpoint file named "checkpoint-***166700000" but with
> the timestamp ***166500000 will recover from the very beginning of the stream
> and because our application relies on the external & periodically-cleaned
> data (syncing with checkpoint cleanup), the recovery just failed
> We identified a potential issue in Spark Streaming checkpoint and will
> describe it with the following example. We will propose a fix in the end of
> this JIRA.
> 1. The application properties: Batch Duration: 20000, Functionality: Single
> Stream calling ReduceByKeyAndWindow and print, Window Size: 60000,
> SlideDuration, 20000
> 2. RDD at 166500000 is generated and the corresponding job is submitted to
> the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the
> queue of JobGenerator
> 3. Job at 166500000 is finished and JobCompleted message is sent to
> JobScheduler's queue, and meanwhile, Job at 166520000 is submitted to the
> execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of
> JobGenerator
> 4. JobScheduler's message processing thread (I will use JS-EventLoop to
> identify it) is not scheduled by the operating system for a long time, and
> during this period, Jobs generated from 166520000 - 166700000 are generated
> and completed.
> 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled
> and processed all DoCheckpoint messages for jobs ranging from 166520000 -
> 166700000 and checkpoint files are successfully written. CRITICAL: at this
> moment, the lastCheckpointTime would be 166700000.
> 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs
> ranging from 166520000 - 166700000. CRITICAL: a ClearMetadata message is
> pushed to JobGenerator's message queue for EACH JobCompleted.
> 7. The current message queue contains 20 ClearMetadata messages and
> JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will
> remove all RDDs out of rememberDuration window. In our case,
> ReduceyKeyAndWindow will set rememberDuration to 100000 (rememberDuration of
> ReducedWindowDStream (40000) + windowSize) resulting that only RDDs <-
> (166600000, 166700000] are kept. And ClearMetaData processing logic will push
> a DoCheckpoint to JobGenerator's thread
> 8. JG-EventLoop is scheduled again to process DoCheckpoint for 16650000, VERY
> CRITICAL: at this step, RDD no later than 166600000 has been removed, and
> checkpoint data is updated as
> https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53
> and
> https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59.
> 9. After 8, a checkpoint named /path/checkpoint-166700000 is created but with
> the timestamp 166500000. and at this moment, Application crashed
> 10. Application recovers from /path/checkpoint-166700000 and try to get RDD
> with validTime 166500000. Of course it will not find it and has to recompute.
> In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until
> to the start of the stream. When the stream depends on the external data, it
> will not successfully recover. In the case of Kafka, the recovered RDDs would
> not be the same as the original one, as the currentOffsets has been updated
> to the value at the moment of 166700000
> The proposed fix:
> 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime
> instead of using the timestamp of Checkpoint instance (any side-effect?)
> 1. ClearMetadata shall be ClearMedataAndCheckpoint
> 2. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see
> any necessary to have two threads here
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]