I want to use the "restore from checkpoint" to continue from last accumulated word counts and process new streams of data. This recovery process will keep accurate state of accumulated counters state (calculated by updateStateByKey) after "failure/recovery" or "temp shutdown/upgrade to new code".
However, the recomendation seem to indicate you have to delete the checkpoint data if you upgrade to new code. How would this work if I change the word count accumulation logic and still want to continue to work from last remembered state?. An example would be that the word counters could be weighted in a logic that is used for streams that are coming from a point-in-time later. This is an example but there are quite a few scenarios where one needs to continue from previous state as rememberd by "updateStateByKey" and apply new logic. ---- code snippets below ------------ As in example "RecoverableNetworkWordCount" we should build "setupInputStreamAndProcessWordCounts" in the context of retrieved checkpoint only. If "setupInputStreamAndProcessWordCounts" called outside the "createContext", you will get error "[Receiver-0-1427260249292] is not unique!". def createContext(checkPointDir: String, host: String, port : Int) = { // If you do not see this printed, that means the StreamingContext has been loaded // from the new checkpoint println("Creating new context") val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(checkPointDir) setupInputStreamAndProcessWordCounts(ssc, host, port) ssc } and invoke in main as def main(args: Array[String]) { val checkPointDir = "./saved_state" if (args.length < 2) { System.err.println("Usage: SavedNetworkWordCount <hostname> <port>") System.exit(1) } val ssc = StreamingContext.getOrCreate(checkPointDir, () => { createContext(checkPointDir,args(0), args(1).toInt) }) //setupInputStreamAndProcessWordCounts(ssc, args(0), args(1).toInt) ssc.start() ssc.awaitTermination() } } def setupInputStreamAndProcessWordCounts(ssc: StreamingContext, hostname: String, port: Int) { // InputDStream has to be created inside createContext, else you get an error val lines = ssc.socketTextStream(hostname, port) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.sum val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } // Update and print the cumulative count using updateStateByKey val countsDstream = wordDstream.updateStateByKey[Int](updateFunc) countsDstream.print() // print or save to external system } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Recovered-state-for-updateStateByKey-and-incremental-streams-processing-tp22229.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org