Updated Branches:
  refs/heads/master 0ac53dc5d -> ecbebc6e5

Misc. improvements to the state management docs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/ecbebc6e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/ecbebc6e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/ecbebc6e

Branch: refs/heads/master
Commit: ecbebc6e55eab4841414add77a9a7816fd13a82a
Parents: 0ac53dc
Author: Jay Kreps <[email protected]>
Authored: Wed Sep 11 17:27:55 2013 -0700
Committer: Jay Kreps <[email protected]>
Committed: Wed Sep 11 17:27:55 2013 -0700

----------------------------------------------------------------------
 .reviewboardrc                                  |  1 +
 .../0.7.0/container/state-management.md         | 62 ++++++++++++--------
 2 files changed, 38 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ecbebc6e/.reviewboardrc
----------------------------------------------------------------------
diff --git a/.reviewboardrc b/.reviewboardrc
new file mode 100644
index 0000000..0ee6a71
--- /dev/null
+++ b/.reviewboardrc
@@ -0,0 +1 @@
+REPOSITORY = 'git://git.apache.org/incubator-samza.git'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ecbebc6e/docs/learn/documentation/0.7.0/container/state-management.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/state-management.md 
b/docs/learn/documentation/0.7.0/container/state-management.md
index a96655c..cafb0f8 100644
--- a/docs/learn/documentation/0.7.0/container/state-management.md
+++ b/docs/learn/documentation/0.7.0/container/state-management.md
@@ -3,9 +3,9 @@ layout: page
 title: State Management
 ---
 
-One of the more interesting aspects of Samza is the ability for tasks to store 
data locally and execute rich queries against it.
+One of the more interesting aspects of Samza is the ability for tasks to store 
data locally and execute rich queries on this data.
 
-Of course simple filtering or single-row transformations can be done without 
any need for collecting state. A simple analogy to SQL may make this more 
obvious. The select- and where-clauses of a SQL query don't usually require 
state: these can be executed a row at a time on input data and maintain state 
between rows. The rest of SQL, multi-row aggregations and joins, require more 
support to execute correctly in a streaming fashion. Samza doesn't provide a 
high-level language like SQL but it does provide lower-level primitives that 
make streaming aggregation and joins and other stateful processing easy to 
implement.
+Of course simple filtering or single-row transformations can be done without 
any need for collecting state. A simple analogy to SQL may make this more 
obvious. The select- and where-clauses of a SQL query don't usually require 
state: these can be executed a row at a time on input data and maintain state 
between rows. The rest of SQL, multi-row aggregations and joins, require 
accumulating state between rows. Samza doesn't provide a high-level language 
like SQL but it does provide lower-level primitives that make streaming 
aggregation and joins and other stateful processing easy to implement.
 
 Let's dive into how this works and why it is useful.
 
@@ -17,17 +17,23 @@ First, let's look at some simplistic examples of stateful 
stream processing that
 
 Example: Counting the number of page views for each user per hour
 
-This kind of windowed processing is common for ranking and relevance, 
"trending topics", as well as simple real-time reporting and monitoring. For 
small windows one can just maintain the aggregate in memory and manually commit 
the task position only at window boundaries. However this means we have to 
recover up to a full window on fail-over. But this will not be very slow for 
large windows because of the amount of reprocessing. For larger windows or for 
effectively infinite windows it is better to make the in-process aggregation 
fault-tolerant rather than try to recompute it.
+This kind of windowed processing is common for ranking and relevance, 
detecting "trending topics", as well as simple real-time reporting and 
monitoring. For small windows one can just maintain the aggregate in memory and 
manually commit the task position only at window boundaries. However this means 
we have to recover up to a full window on fail-over, which will be very slow 
for large windows due to the amount of reprocessing. For large (or infinite!) 
windows it is better to make the in-process aggregation fault-tolerant rather 
than try to recompute it.
 
 ##### Table-table join
 
-Example: Join a table of user profiles to a table of user\_settings by 
user\_id and emit the joined stream
+Example: Join a table of user profiles to a table of user\_settings by 
user\_id and emit a "materialized view" of the joined stream
 
-This example is somewhat simplistic: one might wonder why you would want to 
join two tables in a stream processing system. However consider a more 
realistic example: real-time data normalization. E-commerce companies need to 
handle product imports, web-crawlers need to update their [database of the 
web](http://labs.yahoo.com/files/YahooWebmap.pdf), and social networks need to 
normalize and index social data for search. Each of these processing flows are 
immensely complex and contain many complex processing stages that effectively 
join together and normalize many data sources into a single clean feed.
+This example is somewhat simplistic: one might wonder why you would want to 
join two tables in a stream processing system. However real-life examples are 
often far more complex then what would normally be considered the domain of 
materialized views over tables. Consider a few examples of real-time data 
normalization:
 
-##### Table-stream join
+* E-commerce companies like Amazon and EBay need to import feeds of 
merchandise from merchants, normalize them by product, and present products 
with all the associated merchants and pricing information.
+* Web search requires building a crawler which creates essentially a [table of 
web page contents](http://labs.yahoo.com/files/YahooWebmap.pdf) and joins on 
all the relevance attributes such as page CTR or pagerank.
+* Social networks take feeds of user-entered text and need to normalize out 
entities such as companies, schools, and skills.
+
+Each of these use cases is a massively complex data normalization problem that 
can be thought of as constructing a very complex materialized view over many 
input tables.
 
-Example: Join user region information to page view data
+##### Stream-table join
+
+Example: Join user region information on to a stream of page views to create 
an augmented stream of page view with region.
 
 Joining side-information to a real-time feed is a classic use for stream 
processing. It's particularly common in advertising, relevance ranking, fraud 
detection and other domains. Activity data such as page views are generally 
captured with only a few primary keys, the additional attributes about the 
viewer and viewed items that are needed for processing need to joined on 
after-the-fact.
 
@@ -47,7 +53,7 @@ So how do systems support this kind of stateful processing? 
We'll lead in by des
 
 #### In-memory state with checkpointing
 
-A simple approach common in academic stream processing systems is to simply to 
periodically save out the state of the task's in-memory data. S4's [state 
management](http://incubator.apache.org/s4/doc/0.6.0/fault_tolerance) 
implements this approach&mdash;tasks implement Java's serializable interface 
and are periodically serialized using java serialization to save out copies of 
the processor state.
+A simple approach, common in academic stream processing systems, is to 
periodically save out the state of the task's in-memory data. S4's [state 
management](http://incubator.apache.org/s4/doc/0.6.0/fault_tolerance) 
implements this approach&mdash;tasks implement Java's serializable interface 
and are periodically serialized using java serialization to save out copies of 
the processor state.
 
 This approach works well enough if the in-memory state consists of only a few 
values. However since you have to save out the complete task state on each save 
this will become increasingly expensive as task state grows. Unfortunately most 
use cases we have seen revolve around joins and aggregation and so have large 
amounts of state&mdash;often many gigabytes. This makes periodic full dumps 
extremely impractical. Some academic systems handle this case by having the 
tasks produce "diffs" in addition to full checkpoints. However this requires a 
great deal of complexity in the task to track what has changed and efficiently 
produce a compact diff of changes.
 
@@ -64,7 +70,7 @@ Samza allows this style of processing (nothing will stop you 
from querying a rem
 To understand why this is useful let's first understand some of the drawbacks 
of making remote queries in a stream processing job:
 
 1. **Performance**: The first major drawback of making remote queries is that 
they are slow and expensive. For example, a Kafka stream can deliver hundreds 
of thousands or even millions of messages per second per CPU core because it 
transfers large chunks of data at a time. But a remote database query is a more 
expensive proposition. Though the database may be partitioned and scalable this 
partitioning doesn't match the partitioning of the job into tasks so batching 
becomes much less effective. As a result you would expect to get a few thousand 
queries per second per core for remote requests. This means that adding a 
processing stage that uses an external database will often reduce the 
throughput by several orders of magnitude.
-1. **Isolation**: If your database or service is also running live processing, 
mixing in asynchronous processing can be quite dangerous. A scalable stream 
processing system can run with very high parallelism. If such a job comes down 
(say for a code push) it queues up data for processing, when it restarts it 
will potentially have a large backlog of data to process. Since the job may 
actually have very high parallelism this can result in huge load spikes, many 
orders of magnitude higher than steady state load. If this load is mixed with 
live queries (i.e. the queries used to build web pages or render mobile ui or 
anything else that has a user waiting on the other end) then you may end up 
causing a denial-of-service attack on your live service.
+1. **Isolation**: If your database or service is also running live processing, 
mixing in asynchronous processing can be quite dangerous. A scalable stream 
processing system can run with very high parallelism. If such a job comes down 
(say for a code push) it queues up data for processing, when it restarts it 
will potentially have a large backlog of data to process. Since the job may 
actually have very high parallelism this can result in huge load spikes, many 
orders of magnitude higher than steady state load. If this load is mixed with 
live queries (i.e. the queries used to build web pages or render mobile UI or 
anything else that has a user waiting on the other end) then you may end up 
causing a denial-of-service attack on your live service.
 1. **Query Capabilities**: Many scalable databases expose very limited query 
interfaces--only supporting simple key-value lookups. Doing the equivalent of a 
"full table scan" or rich traversal may not be practical in this model.
 1. **Correctness**: If your task keeps counts or otherwise modifies state in a 
remote store how is this rolled back if the task fails? 
 
@@ -78,28 +84,40 @@ You can think of this as taking the remote table out of the 
remote database and
 
 ![state-local](/img/0.7.0/learn/documentation/container/stateful_job.png)
 
-Note that now the state is physically on the same machine as the tasks, and 
each task has access only to its local partition. However the combination of 
stateful tasks with the normal partitioning capabilities Samza offers makes 
this a very general feature: in general you just repartition on the key by 
which you want to split your processing and then you have full local access to 
the data within storage in that partition.
-
-In cases where we were querying the external database on each input message to 
join on additional data for our output stream we would now instead create an 
input stream coming from the remote database that captures the changes to the 
database.
+Note that now the state is physically on the same machine as the tasks, and 
each task has access only to its local partition. However the combination of 
stateful tasks with the normal partitioning capabilities Samza offers makes 
this a very general feature: you just repartition on the key by which you want 
to split your processing and then you have full local access to the data within 
storage in that partition.
 
 Let's look at how this addresses the problems of the remote store:
 
 1. This fixes the performance issues of remote queries because the data is now 
local, what would otherwise be a remote query may now just be a lookup against 
local memory or disk (we ship a 
[LevelDB](https://code.google.com/p/leveldb)-based store which is described in 
detail below).
-1. The isolation issues goes away as well as the queries are executed against 
the same servers the job runs against and this computation is not intermixed 
with live service calls.
+1. The isolation issue goes away as well as the queries are executed against 
the same servers the job runs against and this computation is not intermixed 
with live service calls.
 1. Data is now local so any kind of data-intensive processing, scans, and 
filtering is now possible.
-1. The store can abide by the same delivery and fault-tolerance guarantees 
that the Samza task itself does.
+1. Since the state changes are themselves modeled as a stream the store can 
abide by the same delivery and fault-tolerance guarantees that Samza gives 
tasks.
+
+This isn't always the right pattern to follow, it has a few drawbacks too.
+1. If the data is very large then storing it with each task that uses the data 
may use more space.
+1. As the per-container data size grows so too will the restore time for a 
failed task (50Mb/sec is a reasonable restore time to expect).
+
+However we find that the local state approach is the best more often than not, 
and, of course, nothing prevents the use of external storage when needed.
+
+### Databases as input streams
+
+In cases where we were querying the external database on each input message we 
can transform this to local processing by instead transforming the database 
into a stream of row changes. These changes can be taken as input by a task, 
stored, and queried against just as the remote database would be.
+
+But how can you get such a stream? Many databases such Oracle, HBase, MySQL, 
and MongoDB offer built-in support for directly capturing changes. If not this 
can be done by publishing a stream of changes to Kafka or by implementing our 
[pluggable stream interface](streams.html) to directly poll the database for 
changes (say by some last_modified timestamp). You want this to be done in a 
way that you can reset the offset or timestamp back to zero to replay the 
current state of the database as changes if you need to reprocess. If this 
stream capture is efficient enough you can often avoid having changelogs for 
your tasks and simply replay them from the source when they fail.
+
+A wonderful contribution would be a generic jdbc-based stream implementation 
for extracting changes from relational databases by modified date.
 
 ### Key-value storage
 
-Though the storage format is pluggable, we provide a key-value store 
implementation to tasks out-of-the-box and gives the usual put/get/delete/range 
queries. This is backed by a highly available "changelog" stream that provides 
fault-tolerance by acting as a kind of [redo 
log](http://en.wikipedia.org/wiki/Redo_log) for the task's state (we describe 
this more in the next section).
+Though the storage format is pluggable, we provide a key-value store 
implementation to tasks out-of-the-box that gives the usual 
put/get/delete/range queries. This is backed by a highly available "changelog" 
stream that provides fault-tolerance by acting as a kind of [redo 
log](http://en.wikipedia.org/wiki/Redo_log) for the task's state (we describe 
this more in the next section).
 
-This key-value storage engine is built on top of 
[LevelDB](https://code.google.com/p/leveldb). LevelDB has several nice 
properties. First it maintains data outside the java heap which means it is 
immediately preferable to any simple approach using a hash table both because 
of memory-efficiency and to avoid GC. It will use an off-heap memory cache and 
when that is exhausted go to disk for lookups&mdash;so small data sets can be 
[very fast](https://code.google.com/p/leveldb) and non-memory-resident 
datasets, though slower, are still possible. It is 
[log-structured](http://www.igvita.com/2012/02/06/sstable-and-log-structured-storage-leveldb/)
 and writes can be performed at close to disk speeds. It also does built-in 
block compression which helps to reduce both I/O and memory usage.
+This key-value storage engine is built on top of 
[LevelDB](https://code.google.com/p/leveldb) using a [LevelDB JNI 
API](https://github.com/fusesource/leveldbjni). LevelDB has several nice 
properties. First it maintains data outside the java heap which means it is 
immediately preferable to any simple approach using a hash table both because 
of memory-efficiency and to avoid GC. It will use an off-heap memory cache and 
when that is exhausted go to disk for lookups&mdash;so small data sets can be 
[very fast](https://code.google.com/p/leveldb) and non-memory-resident 
datasets, though slower, are still possible. It is 
[log-structured](http://www.igvita.com/2012/02/06/sstable-and-log-structured-storage-leveldb/)
 and writes can be performed at close to disk speeds. It also does built-in 
block compression which helps to reduce both I/O and memory usage.
 
-The nature of Samza's usage allows us to optimize this further. We add an 
optional "L1" LRU cache which is in-heap and holds deserialized rows. This 
cache is meant to be very small and let's us introduce several optimizations 
for both reads and writes.
+The nature of Samza's usage allows us to optimize this further. We add an 
optional "L1" LRU cache which is in-heap and holds deserialized rows. This 
cache is meant to be very small and lets us introduce several optimizations for 
both reads and writes.
 
 The cache is an "object" or "row" cache&mdash;that is it maintains the java 
objects stored with no transformation or serialization. This complements 
LevelDB's own block level caching well. Reads and writes both populate the 
cache, and reads on keys in the cache avoid the cost of deserialization for 
these very common objects.
 
-For writes the cache provides two benefits. Since LevelDB is itself really 
only a persistent "cache" in our architecture we do not immediately need to 
apply every write to the filesystem. We can batch together a few hundred writes 
and apply them all at once. LevelDB heavily optimizes this kind of batch write. 
This does not impact consistency&mdash;a task always reads what it wrote (since 
it checks the cache first). Secondly the cache effectively deduplicates updates 
so that if multiple updates to the same key occur close together we can 
optimize away all but the final write to leveldb and the changelog. For 
example, an important use case is maintaining a small number of counters that 
are incremented on every input. A naive implementation would need to write out 
each new value to LevelDB as well as perhaps logging the change out to the 
changelog for the task. In the extreme case where you had only a single 
variable, x, incremented on each input, an uncached implementation would produ
 ce writes in the form "x=1", "x=2", "x=3", etc which is quite inefficient. 
This is overkill, we only need to flush to the changelog at [commit 
points](checkpointing.html) not on every write. This allows us to "deduplicate" 
the writes that go to leveldb and the changelog to just the final value before 
the commit point ("x=3" or whatever it happened to be).
+For writes the cache provides two benefits. Since LevelDB is itself really 
only a persistent "cache" in our architecture we do not immediately need to 
apply every write to the filesystem. We can batch together a few hundred writes 
and apply them all at once. LevelDB heavily optimizes this kind of batch write. 
This does not impact consistency&mdash;a task always reads what it wrote (since 
it checks the cache first and is the only writer to its store). Secondly the 
cache effectively deduplicates updates so that if multiple updates to the same 
key occur close together we can optimize away all but the final write to 
leveldb and the changelog. For example, an important use case is maintaining a 
small number of counters that are incremented on every input. A naive 
implementation would need to write out each new value to LevelDB as well as 
perhaps logging the change out to the changelog for the task. In the extreme 
case where you had only a single variable, x, incremented on each input, an
  uncached implementation would produce writes in the form "x=1", "x=2", "x=3", 
etc which is quite inefficient. This is overkill, we only need to flush to the 
changelog at [commit points](checkpointing.html) not on every write. This 
allows us to "deduplicate" the writes that go to leveldb and the changelog to 
just the final value before the commit point ("x=3" or whatever it happened to 
be).
 
 The combination of these features makes it possible to provide highly 
available processing that performs very close to memory speeds for small 
datasets yet still scales up to TBs of data (partitioned up across all the 
tasks).
 
@@ -111,7 +129,7 @@ The answer is that Samza handles state as just another 
stream. There are two mec
 
 The first approach is just to allow the task to replay one or more of its 
input streams to populate its store when it restarts. This works well if the 
input stream maintains the complete data (as a stream fed by a database table 
might) and if the input stream is fast enough to make this practical. This 
requires no framework support.
 
-However often the state that is stored is much smaller than the input stream 
(because is it an aggregation or projection of the original input streams). Or 
the input stream may not maintain a complete, replayable set of inputs (say for 
event logs). To support these cases we provide the ability to back the state of 
the store with a changelog stream. A changelog is just a stream to which the 
task logs each change to its state&mdash;i.e. the sequence of key-value pairs 
applied to the local store. Changelogs are co-partitioned with their tasks (so 
each task has its own stream partition for which it is the only writer).
+However often the state that is stored is much smaller than the input stream 
(because it is an aggregation or projection of the original input streams). Or 
the input stream may not maintain a complete, replayable set of inputs (say for 
event logs). To support these cases we provide the ability to back the state of 
the store with a changelog stream. A changelog is just a stream to which the 
task logs each change to its state&mdash;i.e. the sequence of key-value pairs 
applied to the local store. Changelogs are co-partitioned with their tasks (so 
each task has its own stream partition for which it is the only writer).
 
 The changelogs are just normal streams&mdash;other downstream tasks can 
subscribe to this state and use it. And it turns out that very often the most 
natural way to represent the output of a job is as the changelog of its task 
(we'll show some examples in a bit).
 
@@ -247,12 +265,6 @@ Note: In this example we are assuming that impressions are 
assigned a unique gui
 
 Implementation: Partition the ad click and ad impression streams by the 
impression id or user id. The task keeps a store of unmatched clicks and 
unmatched impressions. When a click comes in we try to find its matching 
impression in the impression store, and vice versa. If a match is found emit 
the joined pair and delete the entry. If no match is found store the event to 
wait for a match. Since this is presumably a left outer join (i.e. every click 
has a corresponding impression but not vice versa) we will periodically scan 
the impression table and delete old impressions for which no click arrived.
 
-### Databases as streams
-
-One assumption we are making in the above is that you can extract a stream of 
changes from your databases. This could be done by publishing a stream of 
changes to Kafka or by implementing our [pluggable stream 
interface](streams.html) to directly poll the database for changes. You want 
this to be done in a way that you can reset the offset or timestamp back to 
zero to replay the current state of the database as changes if you need to 
reprocess. If this stream capture is efficient enough you can often avoid 
having changelogs for your tasks and simply replay them from the source when 
they fail.
-
-A wonderful contribution would be a generic jdbc-based stream implementation 
for extracting changes from relational databases.
-
 ### Implementing storage engines
 
 We mentioned that the storage engine interface was pluggable. Of course you 
can use any data structure you like in your task provided you can repopulate it 
off your inputs on failure. However to plug into our changelog infrastructure 
you need to implement a generic StorageEngine interface that handles restoring 
your state on failure and ensures that data is flushed prior to commiting the 
task position.

Reply via email to