Another point to start playing with updateStateByKey is the example
StatefulNetworkWordCount. See the streaming examples directory in the
Spark repository.

TD



On Thu, Dec 18, 2014 at 6:07 AM, Pierce Lamb
<richard.pierce.l...@gmail.com> wrote:
> I am trying to run stateful Spark Streaming computations over (fake)
> apache web server logs read from Kafka. The goal is to "sessionize"
> the web traffic similar to this blog post:
> http://blog.cloudera.com/blog/2014/11/how-to-do-near-real-time-sessionization-with-spark-streaming-and-apache-hadoop/
>
> The only difference is that I want to "sessionize" each page the IP
> hits, instead of the entire session. I was able to do this reading
> from a file of fake web traffic using Spark in batch mode, but now I
> want to do it in a streaming context.
>
> Log files are read from Kafka and parsed into K/V pairs of
>
> (String, (String, Long, Long)) or
>
> (IP, (requestPage, time, time))
>
> I then call "groupByKey()" on this K/V pair. In batch mode, this would
> produce a:
>
> (String, CollectionBuffer((String, Long, Long), ...) or
>
> (IP, CollectionBuffer((requestPage, time, time), ...)
>
> In a StreamingContext, it produces a:
>
> (String, ArrayBuffer((String, Long, Long), ...) like so:
>
> (183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
>
> However, as the next microbatch (DStream) arrives, this information is
> discarded. Ultimately what I want is for that ArrayBuffer to fill up
> over time as a given IP continues to interact and to run some
> computations on its data to "sessionize" the page time. I believe the
> operator to make that happen is "updateStateByKey." I'm having some
> trouble with this operator (I'm new to both Spark & Scala); any help
> is appreciated.
>
> Thus far:
>
>     val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
>
>
>         def updateGroupByKey(
>                               a: Seq[(String, ArrayBuffer[(String,
> Long, Long)])],
>                               b: Option[(String, ArrayBuffer[(String,
> Long, Long)])]
>                               ): Option[(String, ArrayBuffer[(String,
> Long, Long)])] = {
>
>       }
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to