[ 
https://issues.apache.org/jira/browse/SPARK-12137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15054339#comment-15054339
 ] 

Ravindar edited comment on SPARK-12137 at 12/12/15 2:55 PM:
------------------------------------------------------------

Sean, thanks for the clarification on the current recovery functionality as 
applied to system failure only. One has to manually delete the checkpoint 
directory when the processing steps change as a part of upgrade; this has to be 
treated separately as application functionality.

For application state continuity in upgrade scenario, the application has to 
explicitly save last state for *updateStateByKey* in each iteration and then 
restore if last saved exists else create a default value. Or this state is 
already there in existing *checkpointing* that you can lookup and retrieve

I am looking a best practice in this scenario (any streaming examples?) with 
following questions

1. Do you serialize/deserialize to/from HDFS with key as file name and state as 
content
2. Do you serialize/deserialize to/from Cassandra with key, content


was (Author: rroopreddy):
Sean, thanks for the clarification on the current functionality. One has to 
manually delete the checkpoint directory when the processing steps change as a 
part of upgrade.

For state continuity in upgrade scenario, the application has to explicitly 
save last state for *updateStateByKey* in each iteration and then restore if 
last saved exists else create a default value. Or this state is already there 
in existing *checkpointing* that you can lookup and retrieve

I am looking a best practice in this scenario (any streaming examples?) with 
following questions

1. Do you serialize/deserialize to/from HDFS with key as file name and state as 
content
2. Do you serialize/deserialize to/from Cassandra with key, content

> Spark Streaming State Recovery limitations
> ------------------------------------------
>
>                 Key: SPARK-12137
>                 URL: https://issues.apache.org/jira/browse/SPARK-12137
>             Project: Spark
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.4.1
>            Reporter: Ravindar
>            Priority: Critical
>
> There was multiple threads in forums asking similar question without a clear 
> answer and hence entering it here.
> We have a streaming application that goes through multi-step processing. In 
> some of these steps stateful operations like *updateStateByKey* are used to 
> maintain an accumulated running state (and other state info) with incoming 
> RDD streams. As streaming application is incremental, it is imperative that 
> we recover/restore from previous known state in the following two scenarios
>   1. On spark driver/streaming application failure.
>      In this scenario the driver/streaming application shutdown and 
> restarted. The recommended approach is enable the *checkpoint(checkpointDir)* 
> and use *StreamingContext.getOrCreate* to restore the context from checkpoint 
> state.
>   2. Upgrade driver/streaming application with additional steps in the 
> processing
>      In this scenario, we introduced new steps with downstream processing for 
> new functionality without changes to existing steps.  Upgrading the streaming 
> application with the new fails on  *StreamingContext.getOrCreate* as there is 
> mismatch in checkpoint saved.
> Both of the above scenarios needs a unified approach where accumulated state 
> has to be saved and restored. The first approach of restoring from checkpoint 
> works for driver failure but not code upgrade. When the application code 
> changed, there is a recommendation to delete checkpoint data when new code is 
> deployed. If so, how do you reconstitute all of the stateful (e.g: 
> updateStateByKey) information from the last run. Every streaming application 
> has to save  up-to-date state for each session represented by key and then 
> initialize it from this when a new session starts for the same key. Does 
> every application have to create their own mechanism given this is very 
> similar to current state checkpointing to HDFS. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to