Chinmay, Thanks for your input.
I'm not understanding what the difference is. With the design that Felix laid out, the co-located Kafka consumer is still doing a push to the storage system, right?. It just happens to be on the same machine. How is this different from pushing batches from a non-local Samza job? How does the pull-based approach you're thinking of deal with feedback and SLAs? Thanks, Roger On Thu, Apr 2, 2015 at 2:54 PM, Chinmay Soman <chinmay.cere...@gmail.com> wrote: > My 2 cents => One thing to note about the push model : multi-tenancy > > When your storage system (Druid for example) is used in a multi-tenant > fashion - then push model is a bit difficult to operate. Primarily because > there is no real feedback loop from the storage system. Yes - if the > storage system starts doing bad - then you get timeouts and higher > latencies - but then you're already in a position where you're probably > breaking SLAs (for some tenant). > > In that sense, a pull model might be better since the consumer can > potentially have more visibility into how this particular node is doing. > Also, with the Kafka consumer batches things up - so theoretically - you > could get similar throughput. Downside of this approach is of course - the > storage system partitioning scheme *has to* line up with the Kafka > partitioning scheme. > > On Thu, Apr 2, 2015 at 11:41 AM, Roger Hoover <roger.hoo...@gmail.com> > wrote: > > > Felix, > > > > I see your point about simple Kafka consumers. My thought was that if > > you're already managing a Samza/YARN deployment then these types of jobs > > would be "just another job" and not require an additional process > > management/monitoring/operations setup. If you've already got a way to > > handle vanilla Kafka jobs then it makes sense. > > > > For the push model, the way we're planning to deal with the latency of > > round-trip calls is to batch up pushs to the downstream system. Both > Druid > > Tranquility and the ES transport node protocol allow you to batch index > > requests. I'm curious if pull would be that much more efficient. > > > > Cheers, > > > > Roger > > > > On Wed, Apr 1, 2015 at 10:26 AM, Felix GV <fville...@linkedin.com.invalid > > > > wrote: > > > > > Hi Roger, > > > > > > You bring up good points, and I think the short answer is that there > are > > > trade-offs to everything, of course (: > > > > > > What I described could definitely be implemented as a Samza job, and I > > > think that would make a lot of sense if the data serving system was > also > > > deployed via YARN. This way, the Samza tasks responsible for ingesting > > and > > > populating the data serving system's nodes could be spawned wherever > YARN > > > knows these nodes are located. For data serving systems not well > > integrated > > > with YARN however, I'm not sure that there would be that much win in > > using > > > the Samza deployment model. And since the consumers themselves are > pretty > > > simple (no joining of streams, no local state, etc.), this seems to be > a > > > case where Samza is a bit overkill and a regular Kafka consumer is > > > perfectly fine (except for the YARN-enabled auto-deployment aspect, > like > > I > > > mentioned). > > > > > > As for push versus pull, I think the trade-off is the following: push > is > > > mostly simpler and more decoupled, as you said, but I think pull would > be > > > more efficient. The reason for that is that Kafka consumption is very > > > efficient (thanks to batching and compression), but most data serving > > > systems don't provide a streaming ingest API for pushing data > efficiently > > > to them, instead they have single record put/insert APIs which require > a > > > round-trip to be acknowledged. This is perfectly fine in low-throughput > > > scenarios, but does not support very high throughput of ingestion like > > > Kafka can provide. By co-locating the pulling process (i.e.: Kafka > > > consumer) with the data serving node, it makes it a bit more affordable > > to > > > do single puts since the (local) round-trip acks would be > > > near-instantaneous. Pulling also makes the tracking of offsets across > > > different nodes a bit easier, since each node can consume at its own > > pace, > > > and resume at whatever point in the past it needs (i.e.: rewind) > without > > > affecting the other replicas. Tracking offsets across many replicas in > > the > > > push model is a bit more annoying, though still doable, of course. > > > > > > -- > > > > > > Felix GV > > > Data Infrastructure Engineer > > > Distributed Data Systems > > > LinkedIn > > > > > > f...@linkedin.com > > > linkedin.com/in/felixgv > > > > > > ________________________________________ > > > From: Roger Hoover [roger.hoo...@gmail.com] > > > Sent: Tuesday, March 31, 2015 8:57 PM > > > To: dev@samza.apache.org > > > Subject: Re: How do you serve the data computed by Samza? > > > > > > Ah, thanks for the great explanation. Any particular reason that the > > > job(s) you described should not be Samza jobs? > > > > > > We're started experimenting with such jobs for Druid and Elasticsearch. > > > For Elasticsearch, the Samza job containers join the Elasticsearch > > cluster > > > as transport nodes and use the Java API to push ES data nodes. > Likewise > > > for Druid, the Samza job uses the Tranquility API to schedule jobs ( > > > > > > > > > https://github.com/metamx/tranquility/tree/master/src/main/scala/com/metamx/tranquility/samza > > > ). > > > > > > The nice part about push versus pull is that the downstream system does > > not > > > need plugins (like ES rivers) that may complicate it's configuration or > > > destabilize the system. > > > > > > Cheers, > > > > > > Roger > > > > > > On Tue, Mar 31, 2015 at 10:56 AM, Felix GV > > <fville...@linkedin.com.invalid > > > > > > > wrote: > > > > > > > Thanks for your reply Roger! Very insightful (: > > > > > > > > > 6. If there was a highly-optimized and reliable way of ingesting > > > > > partitioned streams quickly into your online serving system, would > > that > > > > > help you leverage Samza more effectively? > > > > > > > > >> 6. Can you elaborate please? > > > > > > > > Sure. The feature set I have in mind is the following: > > > > > > > > * Provide a thinly-wrapped Kafka producer which does appropriate > > > > partitioning and includes useful metadata (such as production > > timestamp, > > > > etc.) alongside the payload. This producer would be used in the last > > step > > > > of processing of a Samza topology, in order to emit to Kafka some > > > > processed/joined/enriched data which is destined for online serving. > > > > * Provide a consumer process which can be co-located on the same > > > hosts > > > > as your data serving system. This process consumes from the > appropriate > > > > partitions and checkpoints its offsets on its own. It leverages Kafka > > > > batching and compression to make consumption very efficient. > > > > * For each records the consumer process issues a put/insert > locally > > > to > > > > the co-located serving process. Since this is a local operation, it > is > > > also > > > > very cheap and efficient. > > > > * The consumer process can also optionally throttle its insertion > > > rate > > > > by monitoring some performance metrics of the co-located data serving > > > > process. For example, if the data serving process exposes a p99 > latency > > > via > > > > JMX or other means, this can be used in a tight feedback loop to back > > off > > > > if read latency degrades beyond a certain threshold. > > > > * This ingestion platform should be easy to integrate with any > > > > consistently-routed data serving system, by implementing some simple > > > > interfaces to let the ingestion system understand the > key-to-partition > > > > assignment strategy, as well as the partition-to-node assignment > > > strategy. > > > > Optionally, a hook to access performance metrics could also be > > > implemented > > > > if throttling is deemed important (as described in the previous > point). > > > > * Since the consumer process lives in a separate process, the > > system > > > > benefits from good isolation guarantees. The consumer process can be > > > capped > > > > to a low amount of heap, and its GC is inconsequential for the > serving > > > > platform. It's also possible to bounce the consumer and data serving > > > > processes independently of each other, if need be. > > > > > > > > There are some more nuances and additional features which could be > nice > > > to > > > > have, but that's the general idea. > > > > > > > > > > > > It seems to me like such system would be valuable, but I'm wondering > > what > > > > other people in the open-source community think, hence why I was > > > interested > > > > in starting this thread... > > > > > > > > > > > > Thanks for your feedback! > > > > > > > > -F > > > > > > > > > > > > > -- > Thanks and regards > > Chinmay Soman >