I want to use the "restore from checkpoint" to continue from last accumulated
word counts and process new streams of data. This recovery process will keep
accurate state of accumulated counters state (calculated by
updateStateByKey) after "failure/recovery" or "temp shutdown/upgrade to new
code".

However, the recomendation seem to indicate you have to delete the
checkpoint data if you upgrade to new code. How would this work if I change
the word count accumulation logic and still want to continue to work from
last remembered state?. An example would be that the word counters could be
weighted in a logic that is used for streams that are coming from a
point-in-time later.

This is an example but there are quite a few scenarios where one needs to
continue from previous state as rememberd by "updateStateByKey" and apply
new logic. 

---- code snippets below ------------

As in example "RecoverableNetworkWordCount" we should build
"setupInputStreamAndProcessWordCounts" in the context of retrieved
checkpoint only. If "setupInputStreamAndProcessWordCounts" called outside
the "createContext", you will get error "[Receiver-0-1427260249292] is not
unique!".

  def createContext(checkPointDir: String, host: String, port : Int) = {
    // If you do not see this printed, that means the StreamingContext has
been loaded
    // from the new checkpoint
    println("Creating new context")
    val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")

    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    ssc.checkpoint(checkPointDir)
    
    setupInputStreamAndProcessWordCounts(ssc, host, port)
    
    ssc
  }

and invoke in main as

  def main(args: Array[String]) {
    val checkPointDir = "./saved_state"

    if (args.length < 2) {
      System.err.println("Usage: SavedNetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    val ssc = StreamingContext.getOrCreate(checkPointDir, () => {
        createContext(checkPointDir,args(0), args(1).toInt)
      })
    
    //setupInputStreamAndProcessWordCounts(ssc, args(0), args(1).toInt)

    ssc.start()
    ssc.awaitTermination()
  }
 }

  def setupInputStreamAndProcessWordCounts(ssc: StreamingContext, hostname:
String, port: Int) {
    // InputDStream has to be created inside createContext, else you get an
error
    val lines = ssc.socketTextStream(hostname, port)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.sum
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
        
    // Update and print the cumulative count using updateStateByKey
    val countsDstream = wordDstream.updateStateByKey[Int](updateFunc)
    countsDstream.print() // print or save to external system
  }
  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Recovered-state-for-updateStateByKey-and-incremental-streams-processing-tp22229.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to