Hey Stone,

A Samza task's state is persisted to a stream. For example, if you use the 
key/value store, calling "write" will trigger a send to the change log stream 
for the store. Note that it's not required that you use a key/value store, this 
is just what comes out of the box. You can plug in other stores (Lucene, bloom 
filter, etc), if you wish using the StorageEngine API.

If Kafka is used as a system for the change log stream, the topic that's 
hosting the change log can be "de-duplicated". We call this "log compaction". 
If you imagine calling put(A, 1), then put (A, 2), it's possible to discard the 
put(A, 1) message, since it's been over-written by the put(A, 2) message. This 
prevents the change log stream from growing unbounded. Kafka will periodically 
delete messages for keys that have been over-written farther down the log. Note 
that log compaction is only enabled in Kafka's trunk codebase right now (which 
will likely become version 0.8.1).

When a failure occurs, the container hosting the task can be restarted 
anywhere. When it restarts, the container will read the state streams for the 
tasks that it hosts from offset 0 (the beginning of the stream) all the way to 
the end. As it reads, it will feed the messages into the state store. By the 
time the end of the stream has been reached, the state store will be fully 
restored. This is done using the StorageEngine.restore method.

One trade-off here is that the larger the state is, the longer it will take for 
a task to restore, since more messages will need to be read and written to the 
local store. A work-around for this is to that you can always increase the 
number of containers to get more parallelism, which should decrease the state 
size/container count.

Cheers,
Chris
________________________________________
From: Stone [[email protected]]
Sent: Friday, August 23, 2013 8:27 PM
To: [email protected]
Subject: Re: How does Samza overcome the topics # limititions of Kafka

Thanks for the clarification.

BTW: Another question:
http://samza.incubator.apache.org/learn/documentation/0.7.0/comparisons/introduction.html

In the Sate section of the Samza intro, it says that Samza Tasks can create
and restore state from local storage (leveldb) , but how does Samza ensure
that the local state & the task that created the state is always on the
same machine ?  For instance :

task A for some topic's partition #0 first running on Machine A which
creates localstate SA. When failure occurs or restarts, who does the
scheduler ensure the tuple (Task, Topic partition #, State for Task) are
always bundled together ?


On Sat, Aug 24, 2013 at 7:16 AM, Chris Riccomini <[email protected]>wrote:

> Hey Guys,
>
> I took a shot at updating the docs:
>
>
> http://samza.incubator.apache.org/learn/documentation/0.7.0/introduction/ar
> chitecture.html
>
> The page now reads:
>
> "The input topic is partitioned using Kafka. Each Samza process reads
> messages from one or more of the input topic's partitions, and emits them
> back out to a different Kafka topic. Each output message is keyed by the
> message's member ID attribute, and this key is mapped to one of the
> topic's partitions (usually by hashing the key, and modding by the number
> of partitions in the topic). The Kafka brokers receive these messages, and
> buffer them on disk until the second job (the counting job on the bottom
> of the diagram) reads the messages, and increments its counters."
>
>
> Cheers,
> Chris
>
> On 8/23/13 8:25 AM, "Jay Kreps" <[email protected]> wrote:
>
> >Oh, I think perhaps that documentation is a bit confusing. The member id
> >would be used for partition selection but not every member id would be a
> >partition. For example if you had four partitions you could partition by
> >hash(key) % 4.
> >
> >The partition count essentially bounds the parallelism of the downstream
> >processing (i.e. you cant have more containers, the physical processes,
> >then you have tasks). Formally
> >  max(# upstream partitions) = # tasks < # containers
> >
> >Our observation is that stream jobs don't require massive parallelism in
> >the way that, for example, Hadoop jobs do, though they often process the
> >same data. This is because they run continuously and pipeline processing.
> >
> >In MapReduce if you have a daily job that processes that day's worth of
> >data it blocks all uses of its output until it completes. As a result you
> >end up in a situation where you want to process 24 hours of data in some
> >set of mapreduce jobs and a particular stage may need to complete pretty
> >quickly, say in, 10 minutes. Obvious this requires processing at 24*60/10
> >=
> >144 times the speed of data acquisition. So you need a sudden burst of
> >many
> >tasks to finish this up as quick as possible. Cases where mapreduce
> >processing isn't incremental at all (i.e. all data is reprocessed) are
> >even
> >more extreme. A stream processing task generally only needs to keep up,
> >which is 144 times less demanding. The exception, of course, is if you
> >anticipate periods of downtime you will need to be able to catch up at a
> >reasonable rate.
> >
> >Currently in Kafka the primary downside of high partition count is longer
> >fail-over time. This is a big problem for cases where Kafka is taking live
> >requests that block a website. But for stream processing a 1 second
> >failover is usually fine. This makes partition counts in the ballpark of
> >100k feasible (but we haven't gone there).
> >
> >Longer term we do think even that will be an issue and the plan is to just
> >work on scaling Kafka's ability to handle high partition counts
> >gracefully.
> >
> >Cheers,
> >
> >-Jay
> >
> >
> >On Fri, Aug 23, 2013 at 3:57 AM, Stone <[email protected]> wrote:
> >
> >> Hello,
> >>
> >> As explained in the following docs:
> >>
> >>
> >>
> >>
> http://samza.incubator.apache.org/learn/documentation/0.7.0/introduction/
> >>architecture.html
> >>
> >> The input topic is partitioned using Kafka. Each Samza process reads
> >> messages from one or more of the input topic's partitions, and emits
> >>them
> >> back out to a different Kafka topic keyed by the message's member ID
> >> attribute.
> >>
> >> In the example above, the task will created many topics keyed by
> >>"message's
> >> member ID attribute", if there's millions of intermediate keys, how does
> >> Samza handle the topic limitations of Kafka? (Ref
> >>
> >>
> http://grokbase.com/t/kafka/users/133v60ng6v/limit-on-number-of-kafka-top
> >>ic
> >>  )
> >>
> >>
> >> Best Regards,
> >> Stone
> >>
>
>

Reply via email to