Another point to start playing with updateStateByKey is the example StatefulNetworkWordCount. See the streaming examples directory in the Spark repository.
TD On Thu, Dec 18, 2014 at 6:07 AM, Pierce Lamb <richard.pierce.l...@gmail.com> wrote: > I am trying to run stateful Spark Streaming computations over (fake) > apache web server logs read from Kafka. The goal is to "sessionize" > the web traffic similar to this blog post: > http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/ > > The only difference is that I want to "sessionize" each page the IP > hits, instead of the entire session. I was able to do this reading > from a file of fake web traffic using Spark in batch mode, but now I > want to do it in a streaming context. > > Log files are read from Kafka and parsed into K/V pairs of > > (String, (String, Long, Long)) or > > (IP, (requestPage, time, time)) > > I then call "groupByKey()" on this K/V pair. In batch mode, this would > produce a: > > (String, CollectionBuffer((String, Long, Long), ...) or > > (IP, CollectionBuffer((requestPage, time, time), ...) > > In a StreamingContext, it produces a: > > (String, ArrayBuffer((String, Long, Long), ...) like so: > > (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000))) > > However, as the next microbatch (DStream) arrives, this information is > discarded. Ultimately what I want is for that ArrayBuffer to fill up > over time as a given IP continues to interact and to run some > computations on its data to "sessionize" the page time. I believe the > operator to make that happen is "updateStateByKey." I'm having some > trouble with this operator (I'm new to both Spark & Scala); any help > is appreciated. > > Thus far: > > val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) > > > def updateGroupByKey( > a: Seq[(String, ArrayBuffer[(String, > Long, Long)])], > b: Option[(String, ArrayBuffer[(String, > Long, Long)])] > ): Option[(String, ArrayBuffer[(String, > Long, Long)])] = { > > } > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org