Updated Branches:
  refs/heads/master 8dcfbb8a5 -> d03cce078

Improve the state management documentation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/d03cce07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/d03cce07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/d03cce07

Branch: refs/heads/master
Commit: d03cce078e9a03ec63c6e84ab531b18c36cffbf9
Parents: 8dcfbb8
Author: Jay Kreps <[email protected]>
Authored: Thu Sep 5 17:08:18 2013 -0700
Committer: Jay Kreps <[email protected]>
Committed: Thu Sep 5 17:08:18 2013 -0700

----------------------------------------------------------------------
 .../documentation/container/stateful_job.png    | Bin 0 -> 26142 bytes
 .../container/stream_job_and_db.png             | Bin 0 -> 30316 bytes
 .../0.7.0/container/state-management.md         | 245 +++++++++++++++----
 3 files changed, 201 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d03cce07/docs/img/0.7.0/learn/documentation/container/stateful_job.png
----------------------------------------------------------------------
diff --git a/docs/img/0.7.0/learn/documentation/container/stateful_job.png 
b/docs/img/0.7.0/learn/documentation/container/stateful_job.png
new file mode 100644
index 0000000..e486079
Binary files /dev/null and 
b/docs/img/0.7.0/learn/documentation/container/stateful_job.png differ

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d03cce07/docs/img/0.7.0/learn/documentation/container/stream_job_and_db.png
----------------------------------------------------------------------
diff --git a/docs/img/0.7.0/learn/documentation/container/stream_job_and_db.png 
b/docs/img/0.7.0/learn/documentation/container/stream_job_and_db.png
new file mode 100644
index 0000000..8ad0af3
Binary files /dev/null and 
b/docs/img/0.7.0/learn/documentation/container/stream_job_and_db.png differ

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d03cce07/docs/learn/documentation/0.7.0/container/state-management.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/container/state-management.md 
b/docs/learn/documentation/0.7.0/container/state-management.md
index a65a3db..c23c7c3 100644
--- a/docs/learn/documentation/0.7.0/container/state-management.md
+++ b/docs/learn/documentation/0.7.0/container/state-management.md
@@ -3,60 +3,168 @@ layout: page
 title: State Management
 ---
 
-Samza allows tasks to maintain persistent, mutable state that is physically 
co-located with each task. The state is highly available: in the event of a 
task failure it will not be lost when the task fails over to another machine.
+One of the more interesting aspects of Samza is the ability for tasks to store 
data locally and execute rich queries against it.
 
-A key-value store implementation is provided out of the box that covers many 
use cases. Other store implementations can be plugged in for different types of 
storage.
+Of course simple filtering or single-row transformations can be done without 
any need for collecting state. A simple analogy to SQL may make make this more 
obvious. The select- and where-clauses of a SQL query don't usually require 
state: these can be executed a row at a time on input data and maintain state 
between rows. The rest of SQL, multi-row aggregations and joins, require more 
support to execute correctly in a streaming fashion. Samza doesn't provide a 
high-level language like SQL but it does provide lower-level primitives that 
make streaming aggregation and joins and other stateful processing easy to 
implement.
 
-State is naturally partitioned with the tasks, with one store per task. When 
there is a backing changelog, the stream will also be co-partitioned with the 
tasks. Possible extensions to handle non-partitioned state (i.e. a global 
lookup dictionary) are discussed at the end.
+Let's dive into how this works and why it is useful.
 
-Restoring state can be done either by having a dedicated stream that captures 
the changes to the local store, or by rebuilding the state off the input 
streams.
+### Common use cases for stateful processing
 
-### Use Cases
+First, let's look at some simplistic examples of stateful stream processing 
that might be seen on a consumer website.
 
-We have a few use-cases in mind for this functionality.
+##### Windowed aggregation
 
-#### Windowed Aggregation
+Example: Counting the number of page views for each user per hour
 
-Example: Counting member page views by hour
+This kind of windowed processing is common for ranking and relevance, 
"trending topics", as well as simple real-time reporting and monitoring.
 
-Implementation: The stream is partitioned by the aggregation key (member\_id). 
Each new input record would cause the job to retrieve and update the aggregate 
(the page view count). When the window is complete (i.e. the hour is over), the 
job outputs the current stored aggregate value.
+##### Table-table join
 
-####Table-Table Join
+Example: Join a table of user profiles to a table of user\_settings by 
user\_id and emit the joined stream
 
-Example: Join profile to user\_settings by member\_id and emit the joined 
stream
+This example is somewhat simplistic: one might wonder why you would want to 
join two tables in a stream processing system. However consider a more 
realistic example: real-time data normalization. E-commerce companies need to 
handle product imports, web-crawlers need to update their [database of the 
web](labs.yahoo.com/files/YahooWebmap.pdf‎), and social networks need to 
normalize and index social data for search. Each of these processing flows are 
emensely complex and contain many complex processing stages that effectively 
join together and normalize many data sources into a single clean feed.
 
-Implementation: The job subscribes to the change stream for profile and for 
user\_settings both partitioned by member\_id. The job keeps a local store 
containing both the profile and settings data. When a record comes in from 
either profile or settings, the job looks up the value for that member and 
updates the appropriate section (either profile or settings). The changelog for 
the local store can be used as the output stream if the desired output stream 
is simply the join of the two inputs.
+##### Table-stream join
 
-#### Table-Stream Join
+Example: Join user region information to page view data
 
-Example: Join member geo region to page view data
+Joining side-information to a real-time feed is a classic use for stream 
processing. It's particularly common in advertising, relevance ranking, fraud 
detection and other domains. Activity data such as page views are generally 
captured with only a few primary keys, the additional attributes about the 
viewer and viewed items that are needed for processing need to joined on 
after-the-fact.
 
-Implementation: The job subscribes to the profile stream (for geo) and page 
views stream, both partitioned by member\_id. It keeps a local store of 
member\_id => geo that it updates off the profile feed. When a page view 
arrives it does a lookup in this store to join on the geo data.
+##### Stream-stream join
 
-#### Stream-Stream Join
+Example: Join a stream of ad clicks to a stream of ad views to link the ad 
view that lead to the click
 
-Example: Join ad clicks to ad impressions by some shared key
+This is the classic stream join for "nearly aligned" streams. If the events 
that need to be joined arrive in a limited window it may be possible to buffer 
unjoined events in memory. Obviously this will be only approximate: any 
in-flight items will be lost if a machine crashes. However for more exact 
results, or to handle a very large window of misalignment, stateful processing 
is needed.
 
-Implementation: Partition ad click and ad impression by the join key. Keep a 
store of unmatched clicks and unmatched impressions. When a click comes in try 
to find its matching impression in the impression store, and vice versa when an 
impression comes in check the click store. If a match is found emit the joined 
pair and delete the entry. If no match is found store the event to wait for a 
match. Since this is presumably a left outer join (i.e. every click has a 
corresponding impression but not vice versa) we will periodically scan the 
impression table and delete old impressions for which no click arrived.
+##### More
 
-#### More
+Of course there are infinite variations on joins and aggregations, but most 
amount to essentially variations and combinations of the above patterns.
 
-Of course there are infinite variations on joins and aggregations, but most 
amount to essentially variations on the above.
+### Approaches to managing task state
 
-### Usage
+So how do systems support this kind of stateful processing? We'll lead in by 
describing what we have seen in other systems and then describe what Samza does.
+
+#### In-memory state with checkpointing
+
+A simple approach common in academic stream processing systems is to simply to 
periodically save out the state of the task's in-memory data. S4's [state 
management](http://incubator.apache.org/s4/doc/0.6.0/fault_tolerance) 
implements this approach&mdash;tasks implement Java's serializable interface 
and are periodically serialized using java serialization to save out copies of 
the processor state.
+
+This approach works well enough if the in-memory state consists of only a few 
values. However since you have to save out the complete task state on each save 
this will become increasingly expensive as task state grows. Unfortunately most 
use cases we have seen revolve around joins and aggregation and so have large 
amounts of state&mdash;often many gigabytes. This makes periodic full dumps 
extremely impractical. Some academic systems handle this case by having the 
tasks produce "diffs" in addition to full checkpoints. However this requires a 
great deal of complexity in the task to track what has changed and efficiently 
produce a compact diff of changes.
+
+#### Using an external store
+
+In the absence of built-in support a common pattern for stateful processing is 
to push any state that would be accumulated between rows into an external 
database or key-value store. You get something that looks like this:
+
+![state-kv-store](/img/0.7.0/learn/documentation/container/stream_job_and_db.png)
+
+Samza allows this style of processing (nothing will stop you from querying a 
remote database or service from your job) but also supports stateful processing 
natively in a way we think is often superior.
+
+#### The problems of remote stores
+
+To understand why this is useful let's first understand some of the drawbacks 
of making remote queries in a stream processing job:
+
+1. **Performance**: The first major drawback of making remote queries is that 
they are slow and expensive. A Kafka stream can deliver hundreds of thousands 
or even millions of messages per second per CPU core because it transfers large 
chunks of data at a time. But a remote database query is a more expensive 
proposition. Though the database may be partitioned and scalable this 
partitioning doesn't match the partitioning of the job into tasks so batching 
becomes much less effective. As a result you would expect to get a few thousand 
queries per second per core for remote requests. This means that adding a 
processing stage that uses an external database will often reduce the 
throughput by several orders of magnitude.
+1. **Isolation**: If your database or service is also running live processing, 
mixing in asynchronous processing can be quite dangerous. A scalable stream 
processing system can run with very high parallelism. If such a job comes down 
(say for a code push) it queues up data for processing, when it restarts it 
will potentially have a large backlog of data to process. Since the job may 
actually have very high parallelism this can result in huge load spikes, many 
orders of magnitude higher than steady state load. If this load is mixed with 
live queries (i.e. the queries used to build web pages or render mobile ui or 
anything else that has a user waiting on the other end) then you may end up 
causing a denial-of-service attack on your live service.
+1. **Query Capabilities**: Many scalable databases expose very limited query 
interfaces--only supporting simple key-value lookups. Doing the equivalent of a 
"full table scan" or rich traversal may not be practical in this model.
+1. **Correctness**: If your task keeps counts or otherwise modifies state in a 
remote store how is this rolled back if the task fails? 
+
+Where these issues become particularly problematic is when you need to 
reprocess data. Your output, after all, is a combination of your code and your 
input&mdash;when you change your code you often want to reprocess input to 
recreate the output state with the new improved code. This is generally quite 
reasonable for pure stream processing jobs, but generally impractical for 
performance and isolation reasons for jobs that make external queries.
+
+### Local state in Samza
+
+Samza allows tasks to maintain persistent, mutable, queryable state that is 
physically co-located with each task. The state is highly available: in the 
event of a task failure it will be restored when the task fails over to another 
machine.
+
+You can think of this as taking the remote table out of the remote database 
and physically partitioning it up and co-locating these partitions with the 
tasks. This looks something like this:
+
+![state-local](/img/0.7.0/learn/documentation/container/stateful_job.png)
+
+Note that now the state is physically on the same machine as the tasks, and 
each task has access only to its local partition. However the combination of 
stateful tasks with the normal partitioning capabilities Samza offers makes 
this a very general feature: in general you just repartition on the key by 
which you want to split your processing and then you have full local access to 
the data within storage in that partition.
+
+Let's look at how this addresses the problems of the remote store:
+
+1. This fixes the performance issues of remote queries because the data is now 
local, what would otherwise be a remote query may now just be a lookup against 
local memory or disk (we ship a 
[LevelDB](https://code.google.com/p/leveldb)-based store which is described in 
detail below).
+1. The isolation issues goes away as well as the queries are executed against 
the same servers the job runs against and this computation is not intermixed 
with live service calls.
+1. Data is now local so any kind of data-intensive processing, scans, and 
filtering is now possible.
+1. The store can abide by the same delivery and fault-tolerance guarantees 
that the Samza task itself does.
+
+### Key-value storage
+
+Though the storage format is pluggable, we provide a key-value store 
implementation to tasks out-of-the-box and gives the usual put/get/delete/range 
queries. This is backed by a highly available "changelog" stream that provides 
fault-tolerance by acting as a kind of [redo 
log](http://en.wikipedia.org/wiki/Redo_log) for the task's state (we describe 
this more in the next section).
+
+This key-value storage engine is built on top of 
[LevelDB](https://code.google.com/p/leveldb). LevelDB has several nice 
properties. First it maintains data outside the java heap which means it is 
immediately preferable to any simple approach using a hash table both because 
of memory-efficiency and to avoid GC. It will use an off-heap memory cache and 
when that is exhausted go to disk for lookups&mdash;so small data sets can be 
[very fast](https://code.google.com/p/leveldb) and non-memory-resident 
datasets, though slower, are still possible. It is 
[log-structured](http://www.igvita.com/2012/02/06/sstable-and-log-structured-storage-leveldb/)
 and writes can be performed at close to disk speeds. It also does built-in 
block compression which helps to reduce both I/O and memory usage.
+
+The nature of Samza's usage allows us to optimize this further. We add an 
optional "L1" LRU cache which is in-heap and holds deserialized rows. This 
cache is meant to be very small and let's us introduce several optimizations 
for both reads and writes.
+
+The cache is an "object" or "row" cache&mdash;that is it maintains the java 
objects stored with no transformation or serialization. This complements 
LevelDB's own block level caching well. Reads and writes both populate the 
cache, and reads on keys in the cache avoid the cost of deserialization for 
these very common objects.
+
+For writes the cache provides two benefits. Since LevelDB is itself really 
only a persistent "cache" in our architecture we do not immediately need to 
apply every write to the filesystem. We can batch together a few hundred writes 
and apply them all at once. LevelDB heavily optimizes this kind of batch write. 
This does not impact consistency&mdash;a task always reads what it wrote (since 
it checks the cache first). Secondly the cache effectively deduplicates updates 
so that if multiple updates to the same key occur close together we can 
optimize away all but the final write to leveldb and the changelog. For 
example, an important use case is maintaining a small number of counters that 
are incremented on every input. A naive implementation would need to write out 
each new value to LevelDB as well as perhaps logging the change out to the 
changelog for the task. In the extreme case where you had only a single 
variable, x, incremented on each input, an uncached implementation would produ
 ce writes in the form "x=1", "x=2", "x=3", etc which is quite inefficient. 
This is overkill, we only need to flush to the changelog at [commit 
points](checkpointing.html) not on every write. This allows us to "deduplicate" 
the writes that go to leveldb and the changelog to just the final value before 
the commit point ("x=3" or whatever it happened to be).
+
+The combination of these features makes it possible to provide highly 
available processing that performs very close to memory speeds for small 
datasets yet still scales up to TBs of data (partitioned up across all the 
tasks).
+
+### Fault-tolerance
+
+As mentioned the actual local storage (i.e. LevelDB for key-value storage) is 
really just a cache. How can we ensure that this data is not lost when a 
machine fails and the tasks running on that machine have to be brought up on 
another machine (which, of course, doesn't yet have the local persistent state)?
+
+The answer is that Samza handles state as just another stream. There are two 
mechanisms for accomplishing this.
+
+The first approach is just to allow the task to replay one or more of its 
input streams to populate its store when it restarts. This works well if the 
input stream maintains the complete data (as a stream fed by a database table 
might) and if the input stream is fast enough to make this practical. This 
requires no framework support.
+
+However often the state that is stored is much smaller than the input stream 
(because is it an aggregation or projection of the original input streams). Or 
the input stream may not maintain a complete, replayable set of inputs (say for 
event logs). To support these cases we provide the ability to back the state of 
the store with a changelog stream. A changelog is just a stream to which the 
task logs each change to its state&mdash;i.e. the sequence of key-value pairs 
applied to the local store. Changelogs are co-partitioned with their tasks (so 
each task has its own stream partition for which it is the only writer).
+
+The changelogs are just normal streams&mdash;other downstream tasks can 
subscribe to this state and use it. And it turns out that very often the most 
natural way to represent the output of a job is as the changelog of its task 
(we'll show some examples in a bit).
+
+Of course a log of changes only grows over time so this would soon become 
impractical. Kafka has 
[log-compaction](https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction)
 which provides special support for this kind of use case, though. This feature 
allows Kafka to compact duplicate entries (i.e. multiple updates with the same 
key) in the log rather than just deleting old log segments. This feature is 
new, it is in trunk and will be released soon as part of the 0.8.1 release.
+
+The Kafka brokers scale well up to terabytes of data per machine for 
changelogs as for other topics. Log compaction proceeds at about 50MB/sec/core 
or whatever the I/O limits of the broker are.
+
+### Other storage engines
+
+One interesting aspect of this design is that the fault-tolerance mechanism is 
completely decoupled from the query apis the storage engine provides to the 
task or the way it stores data on disk or in memory. We have provided a 
key-value index with Samza, but you can easily implement and plug-in storage 
engines that are optimized for other types of queries and plug them in to our 
fault tolerance mechanism to provide different query capabilities to your tasks.
+
+Here are a few examples of storage engine types we think would be interesting 
to pursue in the future (patches accepted!):
+
+##### Persistent heap
+
+A common operation in stream processing is to maintain a running top-N. There 
are two primary applications of this. The first is ranking items over some 
window. The second is performing a "bounded sort" operation to transform a 
nearly sorted input stream into a totally sorted output stream. This occurs 
when dealing with a data stream where the order is by arrival and doesn't 
exactly match the source timestamp (for example log events collected across 
many machines).
+
+##### Sketches
+
+Many applications don't require exact results and for these [approximate 
algorithms](http://infolab.stanford.edu/~ullman/mmds/ch4.pdf) such as [bloom 
filters](http://en.wikipedia.org/wiki/Bloom_filter) for set membership, 
[hyperloglog](http://research.google.com/pubs/pub40671.html) for counting 
distinct keys, and a multitude of algorithms for quantile and histogram 
approximation.
+
+These algorithms are inherently approximate but good algorithms give a strong 
bound on the accuracy of the approximation. This obviously doesn't carry over 
well to the case where the task can crash and lose all state. By logging out 
the changes to the structure we can ensure it is restored on fail-over. The 
nature of sketch algorithms allows significant opportunity for optimization in 
the form of logging. 
+
+##### Inverted index
+
+Inverted indexes such as is provided by [Lucene](http://lucene.apache.org) are 
common for text matching and other applications that do matching and ranking 
with selective queries and large result sets. 
+
+##### More
+
+There are a variety of other storage engines that could be useful:
+
+* For small datasets logged, in-memory collections may be ideal.
+* Specialized data structures for graph traversal are common.
+* Many applications are doing OLAP-like aggregations on their input. It might 
be possible to optimize these kinds of dimensional summary queries.
+
+### Using the key-value store
+
+In this section we will give a quick tutorial on configuring and using the 
key-value store.
 
 To declare a new store for usage you add the following to your job config:
 
-    # Use the key-value store implementation for 
+    # Use the key-value store implementation for a store called "my-store"
     stores.my-store.factory=samza.storage.kv.KeyValueStorageEngineFactory
-    # Log changes to the store to a stream
+
+    # Log changes to the store to an output stream for restore
+    # If no changelog is specified the store will not be logged (but you can 
still rebuild off your input streams)
     stores.my-store.changelog=my-stream-name
+
+    # The system to use for the changelog stream
+    stores.my-store.system=kafka
+
     # The serialization format to use
     stores.my-store.serde=string
-    # The system to use for the changelog
-    stores.my-store.system=kafka
 
-Example code:
+Here is some simple example code that only writes to the store:
 
     public class MyStatefulTask implements StreamTask, InitableTask {
       private KeyValueStore<String, String> store;
@@ -71,7 +179,7 @@ Example code:
       }
     }
 
-This shows the put() API, but KeyValueStore gives a fairly general key-value 
interface:
+Here is the complete key-value store API:
 
     public interface KeyValueStore<K, V> {
       V get(K key);
@@ -82,7 +190,70 @@ This shows the put() API, but KeyValueStore gives a fairly 
general key-value int
       KeyValueIterator<K,V> all();
     }
 
-### Implementing Storage Engines
+Here is a list of additional configurations accepted by the key-value store 
along with their default values:
+
+    # The number of writes to batch together
+    stores.my-store.batch.size=500
+
+    # The total number of objects to cache in the "L1" object cache. This must 
be at least as large as the batch.size.
+    # A cache size of 0 disables all caching and batching.
+    stores.my-store.object.cache.size=1000
+
+    # The size of the off-heap leveldb block cache in bytes
+    stores.my-store.leveldb.block.cache.size=16777216
+
+    # Enable block compression using snappy?
+    stores.my-store.leveldb.compress=true
+
+    # The remaining options are a bit low-level and likely you don't need to 
change them unless you are a compulsive fiddler
+
+    # The leveldb block size (see leveldb docs).
+    stores.my-store.leveldb.block.size=4096
+
+    # The amount of memory leveldb uses for buffering the latest segment, also 
the size of leveldb's segment files. 
+    stores.my-store.leveldb.write.buffer.size=16777216
+
+### Implementing common use cases with the key-value store
+
+Let's look at how you can address some of the common use-cases we discussed 
before using the key-value storage engine.
+
+##### Windowed aggregation
+
+Example: Counting the number of page views for each user per hour
+
+Implementation: We have two processing stages. The first partitions the input 
data by user id (if it's already partitioned by user id, which would be 
reasonable, you can skip this), and the second stage does the counting. The job 
has a single store containing the mapping of user_id to the running count. Each 
new input record would cause the job to retrieve the current running count, 
increment it and write back the count. When the window is complete (i.e. the 
hour is over), we iterate over the contents of our store and emit the 
aggregates.
+
+One thing to note is that this job effectively pauses at the hour mark to 
output its results. This is unusual for stream processing, but totally fine for 
Samza&mdash;and we have specifically designed for this case. Scans over the 
contents of the key-value store will be quite fast and input data will buffer 
while the job is doing this scanning and emitting aggregates.
+
+##### Table-table join
+
+Example: Join a table of user profiles to a table of user settings by user\_id 
and emit the joined stream
+
+Implementation: The job subscribes to the change stream for user profiles and 
for user settings databases, both partitioned by user\_id. The job keeps a 
single key-value store keyed by user\_id containing the joined contents of 
profiles and settings. When a new record comes in from either stream it looks 
up the current value in its store and writes back the record with the 
appropriate fields updated (i.e. new profile fields if it was a profile update, 
and new settings fields if it was a settings update). The changelog of the 
store doubles as the output stream of the task.
+
+##### Table-stream join
+
+Example: Join user zip code to page view data (perhaps to allow aggregation by 
zip code in a later stage)
+
+Implementation: The job subscribes to the user profile stream and page view 
stream. Each time it gets a profile update it stores the zipcode keyed by 
user\_id. Each time a page view arrives it looks up the zip code for the user 
and emits the enriched page view + zipcode event.
+
+##### Stream-stream join
+
+Example: Join ad clicks to ad impressions by impression id (an impression is 
advertising terminology for the event that records the display of an ad)
+
+Note: In this example we are assuming that impressions are assigned a unique 
guid and this is present in both the original impression event and any 
subsequent click. In the absence of this the business logic for choosing the 
join could be substituted for the simple lookup.
+
+Implementation: Partition the ad click and ad impression streams by the 
impression id or user id. The task keeps a store of unmatched clicks and 
unmatched impressions. When a click comes in we try to find its matching 
impression in the impression store, and vice versa. If a match is found emit 
the joined pair and delete the entry. If no match is found store the event to 
wait for a match. Since this is presumably a left outer join (i.e. every click 
has a corresponding impression but not vice versa) we will periodically scan 
the impression table and delete old impressions for which no click arrived.
+
+### Databases as streams
+
+One assumption we are making in the above is that you can extract a stream of 
changes from your databases. This could be done by publishing a stream of 
changes to Kafka or by implementing our [pluggable stream 
interface](streams.html) to directly poll the database for changes. You want 
this to be done in a way that you can reset the offset or timestamp back to 
zero to replay the current state of the database as changes if you need to 
reprocess. If this stream capture is efficient enough you can often avoid 
having changelogs for your tasks and simply replay them from the source when 
they fail.
+
+A wonderful contribution would be a generic jdbc-based stream implementation 
for extracting changes from relational databases.
+
+### Implementing storage engines
+
+We mentioned that the storage engine interface was pluggable. Of course you 
can use any data structure you like in your task provided you can repopulate it 
off your inputs on failure. However to plug into our changelog infrastructure 
you need to implement a generic StorageEngine interface that handles restoring 
your state on failure and ensures that data is flushed prior to commiting the 
task position.
 
 The above code shows usage of the key-value storage engine, but it is not too 
hard to implement an alternate storage engine. To do so, you implement methods 
to restore the contents of the store from a stream, flush any cached content on 
commit, and close the store:
 
@@ -94,22 +265,8 @@ The above code shows usage of the key-value storage engine, 
but it is not too ha
 
 The user specifies the type of storage engine they want by passing in a 
factory for that store in their configuration.
 
-### Fault Tolerance Semantics with State
-
-Samza currently only supports at-least-once delivery guarantees. We will 
extend this to exact atomic semantics across outputs to multiple 
streams/partitions in the future.
-
-<!-- TODO add fault tolerance semantics SEP link when one exists
-The most feasible plan for exact semantics seems to me to be journalling 
non-deterministic decisions proposal outlined in the fault-tolerance semantics 
wiki. I propose we use that as a working plan.
-
-To ensure correct semantics in the presence of faults we need to ensure that 
the task restores to the exact state at the time of the last commit.
-
-If the task is fed off replayable inputs then it can simply replay these 
inputs to recreate its state.
-
-If the task has a changelog to log its state then there is the possibility 
that the log contains several entries beyond the last commit point. The store 
should only restore up to the last commit point to ensure that the state is in 
the correct position with respect to the inputs–the remaining changelog will 
then be repeated and de-duplicated as the task begins executing.
--->
-
-### Shared State
+### Fault tolerance semantics with state
 
-Originally we had discussed possibly allowing some facility for global lookup 
dictionaries that are un-partitioned; however, this does not work with our 
fault-tolerance semantics proposal, as the container-wide state changes out of 
band (effectively acting like a separate database or service). This would not 
work with proposed message de-duplication features since the task output is not 
deterministic.
+Samza currently only supports at-least-once delivery guarantees in the 
presence of failure (this is sometimes referred to as "guaranteed delivery"). 
This means messages are not lost but if a task fails some messages may be 
redelivered. The guarantee holds from the commit point of the task which 
records the position from which the task will restart on failure (the user can 
either force a commit at convenient points or the framework will by default do 
this at regular intervals). This is true for both input, output, and changelog 
streams. This is a fairly weak guarantee&mdash;duplicates can give incorrect 
results in counts, for example. We have a plan to extend this to exact 
semantics in the presence of failure which we will include in a future release.
 
 ## [Metrics &raquo;](metrics.html)

Reply via email to