Hey Jordan,

> Couldn't you instead concurrently commit offsets for each owned
>partition by taking the minimum offset of the threads working on that
>partition, minus one? That way, in the worst case, you'd only screw up by
>forgetting to commit some just-finished work until the next call to
>window().

Yes, you could, but this would require changes to Samza, itself. The
window() method can be done today with no changes to Samza.



One other random aside on the threading situation is that, if you care
about message ordering, you'll need to make sure that messages that are
handed off to threads are done so based on their key or the partition they
came from. Otherwise, t2 could get m1, and t1 could get m2, and t1 might
finish processing first, which would lead to out-of-order processing
(multi-subscriber partitions within a single job).


> However, we recently switched to having each machine have as many
>Kafka-managed consumer threads as cores, and did away with the separate
>thread pool.

Unless I'm misunderstanding, this is exactly what Samza does, doesn't it?
Each SamzaContainer is single threaded, so running N of them on a machine,
where N is the number of cores, results in the exact same model (since
each SamzaContainer has its own consumer threads).

> Since Samza was built with single-threaded containers in mind, it seems
>to me that it might be tricky to get Samza to tell YARN that it wants n
>compute units for a single container. Is there a way to accomplish this?


This trickiness is why we are encouraging the one core per container
model. You can get around this by using the yarn.container.cpu.cores
setting, though. Setting this to a higher number will tell YARN that more
cores are being used.

Cheers,
Chris

On 10/28/14 12:16 PM, "Jordan Lewis" <[email protected]> wrote:

>Hey Chris,
>
>Thanks for the detailed response.
>
>Your proposed solution for adding parallelism makes sense, but I don't yet
>understand the importance of the blocking step in window(). Couldn't you
>instead concurrently commit offsets for each owned partition by taking the
>minimum offset of the threads working on that partition, minus one? That
>way, in the worst case, you'd only screw up by forgetting to commit some
>just-finished work until the next call to window().
>
>We've had some experience with this strategy before, actually. We used to
>have each machine use a single Kafka worker thread that read from all of
>the partitions that it owned, and send the messages it consumes to a
>worker
>pool (sized proportionally to the number of cores on the machine) for
>processing. As you mention it's tricky to do the offset management right
>in
>this way. However, we recently switched to having each machine have as
>many
>Kafka-managed consumer threads as cores, and did away with the separate
>thread pool. We like this approach a lot - it's simple, easy to manage,
>and
>doesn't expose us to data loss. Have you considered adding this kind of
>partition/task based parallelism to Samza? It seems to me that this isn't
>so hard to understand, and seems like it might produce less overhead.
>However, I can also see the appeal of having the simple one thread per
>container model.
>
>Let's pretend for a moment that cross-task memory sharing was implemented,
>and that we also choose the dangerous road of adding multithreading to our
>task implementations. Since Samza was built with single-threaded
>containers
>in mind, it seems to me that it might be tricky to get Samza to tell YARN
>that it wants n compute units for a single container. Is there a way to
>accomplish this?
>
>Thanks,
>Jordan Lewis
>
>On Mon, Oct 27, 2014 at 5:51 PM, Chris Riccomini <
>[email protected]> wrote:
>
>> Hey Jordan,
>>
>> Your question touches on a couple of things:
>>
>> 1. Shared objects between Samza tasks within one container.
>> 2. Multi-threaded SamzaContainers.
>>
>> For (1), there is some discussion on shared state here:
>>
>>   https://issues.apache.org/jira/browse/SAMZA-402
>>
>> The outcome of this ticket was that it's something we want, but aren't
>> implementing right now. The idea is to have a state shore that's shared
>> amongst all tasks in a container. The store would be immutable, and
>>would
>> be restored on startup via a stream that had all required data.
>>
>> An alternative to this is to just have a static variable that all tasks
>> use. This will allow all tasks within one container to use the object.
>> We've done this before, and it works reasonably well for immutable
>> objects, which you have.
>>
>> For (2), we've actively tried to avoid adding threading to the
>> SamzaContainer. Having a single threaded container has worked out pretty
>> well for us, and greatly simplifies the mental model that people need to
>> have to use Samza. Our advice to people who ask about adding parallelism
>> is to tell them to add more containers.
>>
>> That said, it is possible to run threads inside a StreamTask if you
>>really
>> want to increase your parallelism. Again, I would advise against this.
>>If
>> not implemented properly, doing so can lead to data loss. The proper way
>> to implement threading inside a StreamTask is to have an thread pool
>> execute, and give threads messages as process() is called. You must then
>> disable offset checkpointing by setting task.commit.ms to -1. Lastly,
>>your
>> task must implement WindowableTask. In the window method, it must block
>>on
>> all threads that are currently processing a message. When all threads
>>have
>> finished processing, it's then safe to checkpoint offsets, and the
>>window
>> method must call coordinator.commit().
>>
>> We've written a task that does this as well, and it works, but you have
>>to
>> know what you're doing to get it right.
>>
>> So, I think the two state options are:
>>
>> 1. Wait for global state to be implemented (or implement it yourself
>>:)).
>> This could take a while.
>> 2. Use static objects to share state among StreamTasks in a given
>> SamzaContainer.
>>
>> And for parallelism:
>>
>> 1. Increase partition/container count for your job.
>> 2. Add threads to your StreamTasks.
>>
>> Cheers,
>> Chris
>>
>> On 10/27/14 12:52 PM, "Jordan Lewis" <[email protected]> wrote:
>>
>> >Hi,
>> >
>> >My team is interested in trying out Samza to augment or replace our
>> >hand-rolled Kafka-based stream processing system. I have a question
>>about
>> >sharing memory across task instances.
>> >
>> >Currently, our main stream processing application has some large,
>> >immutable
>> >objects that need to be loaded into JVM heap memory in order to process
>> >messages on any partition of certain topics. We use thread-based
>> >parallelism in our system, so that the Kafka consumer threads on each
>> >machine listening to these topics can use the same instance of these
>>large
>> >heap objects. This is very convenient, as these objects are so large
>>that
>> >storing multiple copies of them would be quite wasteful.
>> >
>> >To use Samza, it seems as though each JVM would have to store copies of
>> >these objects separately, even if we were to use LevelDB's off-heap
>> >storage
>> >- each JVM would eventually have to inflate the off-heap memory into
>> >regular Java objects to be usable. One solution to this problem could
>>be
>> >using something like Google's Flatbuffers [0] for these large objects
>>- so
>> >that we could use accessors on large, off-heap ByteBuffers without
>>having
>> >to actually deserialize them. However, we think that doing this for
>>all of
>> >the relevant data we have would be a lot of work.
>> >
>> >Have you guys considered implementing a thread-based parallelism model
>>for
>> >Samza, whether as a replacement or alongside the current JVM-based
>> >parallelism approach? What obstacles are there to making this happen,
>> >assuming that decided not to do it? This approach would be invaluable
>>for
>> >our use case, since we rely so heavily (perhaps unfortunately so) on
>>these
>> >shared heap data structures.
>> >
>> >Thanks,
>> >Jordan Lewis
>> >
>> >[0]: http://google.github.io/flatbuffers/
>>
>>

Reply via email to