1. Yeah I think maybe the confusion is that we give the examples of stateful processing without saying why. I tried to make it a little more clear. 2. Tried to make the transition a little more clear. Samza kind of assumes at least passing familiarity with Kafka so to some extent this is unavoidable, but I think the problem is that it isn't clear why we are talking about kafka (a stream implementation). 3. Not sure how to clarify the diagram (any suggestions?), but hopefully improved the text.
Let me know if you feel this helps: diff --git a/docs/learn/documentation/0.7.0/container/state-management.mdb/docs/learn/documentation/0.7.0/container/ state-management.md index c23c7c3..2f9b740 100644 --- a/docs/learn/documentation/0.7.0/container/state-management.md +++ b/docs/learn/documentation/0.7.0/container/state-management.md @@ -5,25 +5,25 @@ title: State Management One of the more interesting aspects of Samza is the ability for tasks to store data locally and execute rich queries against it. -Of course simple filtering or single-row transformations can be done without any need for collecting state. A simple analogy to SQL may make make this more obvious. The select- and where-clauses of a SQL query don't usually require state: these can be executed a row at a time on input data and maintain state between rows. The rest of SQL, multi-row aggregations and joins, require more support to execute correctly in a streaming fashion. Samza doesn't provide a high-level language like SQL but it does provide lower-level primitives that make streaming aggregation and joins and other stateful processing easy to implement. +Of course simple filtering or single-row transformations can be done without any need for collecting state. A simple analogy to SQL may make this more obvious. The select- and where-clauses of a SQL query don't usually require state: these can be executed a row at a time on input data and maintain state between rows. The rest of SQL, multi-row aggregations and joins, require more support to execute correctly in a streaming fashion. Samza doesn't provide a high-level language like SQL but it does provide lower-level primitives that make streaming aggregation and joins and other stateful processing easy to implement. Let's dive into how this works and why it is useful. ### Common use cases for stateful processing -First, let's look at some simplistic examples of stateful stream processing that might be seen on a consumer website. +First, let's look at some simplistic examples of stateful stream processing that might be seen on a consumer website. Later in this document we'll go through specific details of using Samza's built-in key-value storage capabilities to implement each of these applications. ##### Windowed aggregation Example: Counting the number of page views for each user per hour -This kind of windowed processing is common for ranking and relevance, "trending topics", as well as simple real-time reporting and monitoring. +This kind of windowed processing is common for ranking and relevance, "trending topics", as well as simple real-time reporting and monitoring. For small windows one can just maintain the aggregate in memory and manually commit the task position only at window boundaries. However this means we have to recover up to a full window on fail-over. But this will not be very slow for large windows because of the amount of reprocessing. For larger windows or for effectively infinite windows it is better to make the in-process aggregation fault-tolerant rather than try to recompute it. ##### Table-table join Example: Join a table of user profiles to a table of user\_settings by user\_id and emit the joined stream -This example is somewhat simplistic: one might wonder why you would want to join two tables in a stream processing system. However consider a more realistic example: real-time data normalization. E-commerce companies need to handle product imports, web-crawlers need to update their [database of the web](labs.yahoo.com/files/YahooWebmap.pdf), and social networks need to normalize and index social data for search. Each of these processing flows are emensely complex and contain many complex processing stages that effectively join together and normalize many data sources into a single clean feed. +This example is somewhat simplistic: one might wonder why you would want to join two tables in a stream processing system. However consider a more realistic example: real-time data normalization. E-commerce companies need to handle product imports, web-crawlers need to update their [database of the web](http://labs.yahoo.com/files/YahooWebmap.pdf), and social networks need to normalize and index social data for search. Each of these processing flows are immensely complex and contain many complex processing stages that effectively join together and normalize many data sources into a single clean feed. ##### Table-stream join @@ -53,7 +53,7 @@ This approach works well enough if the in-memory state consists of only a few va #### Using an external store -In the absence of built-in support a common pattern for stateful processing is to push any state that would be accumulated between rows into an external database or key-value store. You get something that looks like this: +In the absence of built-in support a common pattern for stateful processing is to push any state that would be accumulated between rows into an external database or key-value store. The database holds aggregates or the dataset being queried to enrich the incoming stream. You get something that looks like this:  @@ -63,7 +63,7 @@ Samza allows this style of processing (nothing will stop you from querying a rem To understand why this is useful let's first understand some of the drawbacks of making remote queries in a stream processing job: -1. **Performance**: The first major drawback of making remote queries is that they are slow and expensive. A Kafka stream can deliver hundreds of thousands or even millions of messages per second per CPU core because it transfers large chunks of data at a time. But a remote database query is a more expensive proposition. Though the database may be partitioned and scalable this partitioning doesn't match the partitioning of the job into tasks so batching becomes much less effective. As a result you would expect to get a few thousand queries per second per core for remote requests. This means that adding a processing stage that uses an external database will often reduce the throughput by several orders of magnitude. +1. **Performance**: The first major drawback of making remote queries is that they are slow and expensive. For example, a Kafka stream can deliver hundreds of thousands or even millions of messages per second per CPU core because it transfers large chunks of data at a time. But a remote database query is a more expensive proposition. Though the database may be partitioned and scalable this partitioning doesn't match the partitioning of the job into tasks so batching becomes much less effective. As a result you would expect to get a few thousand queries per second per core for remote requests. This means that adding a processing stage that uses an external database will often reduce the throughput by several orders of magnitude. 1. **Isolation**: If your database or service is also running live processing, mixing in asynchronous processing can be quite dangerous. A scalable stream processing system can run with very high parallelism. If such a job comes down (say for a code push) it queues up data for processing, when it restarts it will potentially have a large backlog of data to process. Since the job may actually have very high parallelism this can result in huge load spikes, many orders of magnitude higher than steady state load. If this load is mixed with live queries (i.e. the queries used to build web pages or render mobile ui or anything else that has a user waiting on the other end) then you may end up causing a denial-of-service attack on your live service. 1. **Query Capabilities**: Many scalable databases expose very limited query interfaces--only supporting simple key-value lookups. Doing the equivalent of a "full table scan" or rich traversal may not be practical in this model. 1. **Correctness**: If your task keeps counts or otherwise modifies state in a remote store how is this rolled back if the task fails? @@ -80,6 +80,8 @@ You can think of this as taking the remote table out of the remote database and Note that now the state is physically on the same machine as the tasks, and each task has access only to its local partition. However the combination of stateful tasks with the normal partitioning capabilities Samza offers makes this a very general feature: in general you just repartition on the key by which you want to split your processing and then you have full local access to the data within storage in that partition. +In cases where we were querying the external database on each input message to join on additional data for our output stream we would now instead create an input stream coming from the remote database that captures the changes to the database. + Let's look at how this addresses the problems of the remote store: 1. This fixes the performance issues of remote queries because the data is now local, what would otherwise be a remote query may now just be a lookup against local memory or disk (we ship a [LevelDB]( https://code.google.com/p/leveldb)-based store which is described in detail below). On Sun, Sep 8, 2013 at 12:21 PM, Tejas Patil <[email protected]>wrote: > I am probably the dumbest person on this list in terms of technical > know-how, but hey,.. if I can understand the doc, then most people would > understand easily :) > > Some comments: > > (1) What does a typical task state consist of ? An explicit example of > "task state" would be helpful. There are couple of good examples in the doc > but none of them say "hey, for this use case the task state is .." > > (2) "The problems of remote stores" -> "Performance": > Before this point, there was no reference of Kafka at all in the doc and > you suddenly start comparing things with Kafka stream. People w/o any Kafka > background would not get that part. > > (3) "Approaches to managing task state" -> "Using an external store" > The figure gave me an impression that tasks' o/p goes to 2 places: o/p > stream and external store. However reading further made me realize that we > just the task state to the external DB > which is different from o/p stream...right ? > > Trivial things: > - "A simple analogy to SQL may make make this more obvious." : Word "make" > occurs twice > - The hyperlink for "database of the web" is not working > > Thanks, > Tejas > > > On Thu, Sep 5, 2013 at 5:14 PM, Jay Kreps <[email protected]> wrote: > > > I took a pass at improving the state management documentation (talking to > > people, I don't think anyone understood what we were saying): > > > > > http://samza.incubator.apache.org/learn/documentation/0.7.0/container/state-management.html > > > > I would love to get some feedback on this, especially from anyone who > > doesn't already know Samza. Does this make any sense? Does it tell you > what > > you need to know in the right order? > > > > -Jay > > >
