As Roger said, (this is a great exchange btw), I really was attracted to Samza by the idea that I could stitch together flows at Runtime vs defining a bolt up front. However, the config does seem verbose, I would imagine that the stitching doesn't change between dev and production, which makes me feel like it should be something more compile time. Maybe even like a fluent builder, or some code generation. It kinda feels like you should treat it like an AST.
On Mon, Sep 8, 2014 at 3:36 PM, Chris Riccomini < [email protected]> wrote: > Hey Roger, > > This summary sounds pretty reasonable to me, on the Samza side, anyway. I > can't comment on the others, as I don't have a great understanding of > their internals. > > I agree with you about the knock on Samza's config system. It's something > I think we'll need to resolve, but we don't have a concrete proposal on > how, yet. Given that you've had a look at these three systems, do you have > any feedback on what you'd like to see. The topic-name issue is a > relatively specific problem, but maybe you have thoughts on a higher-level > design that you'd like to have? > > Right now, we have: > > 1. We mix wiring and config in config files. Wiring in configs leads to > runtime errors. We have no code-level "builder" for assembling a job, and > no DI framework. > 2. Everything is a k-v string pair. Nothing is hierarchical. > 3. Config is exposed to developers via a single Config object, that's > nothing more than a Map<String, String> with some helper methods. > > Do you have any thoughts on what you liked from the other frameworks, and > what you'd like to see Samza do in this area? > > Cheers, > Chris > > On 9/8/14 2:23 PM, "Roger Hoover" <[email protected]> wrote: > > >Hi Chris, > > > >We've been looking at Samza for a number of use cases ranging from system > >monitoring to user activity analysis for a pretty large retail website. > > For system monitoring, for example, we want to push system performance > >events into Kafka then have Samza jobs enrich these events with additional > >context such as information about the deployment topology and/or the user > >who triggered it. Then we want to feed the enriched streams into various > >data stores including Elasticsearch and Hadoop for analysis. > > > >I've been diving into Samza, Spark Streaming, and Storm. So far, I'm > >impressed with the simplicity of Samza. I feel like it's the easiest to > >reason about and can scale up to larger teams where not everyone is an > >expert on the framework. I may be wrong about some of my conclusions so > >I'm interested in anyone else wants to share their experience. > > > >Samza > > - The simplest to reason about from a developer's perspective. Tasks > >are > >single threaded and the API is very simple to understand. Local state > >management API simplifies the developer's job a lot and should give > >excellent performance. Scalability is well defined (parallelism is > >determined by number of Kafka partitions + number of tasks). Topologies > >are explicit. You create tasks for each step and wire them together. > > - The simplest to operate because each task is independent from the > >others. Kafka protects against backpressure and provides fine grain fault > >tolerance. Seems unlikely that a poorly written job/task will hurt more > >than itself. Other than Kafka, there are minimal network dependencies. > > There are relatively few moving parts. > > - The framework is pretty lightweight, relying Kafka and YARN to do most > >of the work. Fantastic documentation! > > - The primary downside that I see is that there is a lot of wiring > >involved and may be easy to mess it up. For example, topic names need to > >specified both in the code and the config and need to match across > >tasks/jobs. You probably won't know that you've screwed up until you find > >a task is not receiving any messages. You also have to change both the > >code and the config if you change the name of a topic. In larger > >organizations, these activities might be done by people on different > >teams. > > > >Storm > >- Trident will do some auto-wiring of topologies but it's hard to reason > >able the generated topologies, especially transactional behavior. > >- Managing non-local state can be tricky to get right. > >- There are lots of moving parts that have to be tuned (LMAX disruptor > >queues, 0mq queues) > >- Ships closures around the cluster. Makes it hard to work with things > >like JRuby. > >- Framework provides things like cluster management/scheduling that can be > >done elsewhere (YARN or Mesos) > > > >Spark Streaming > >- Quite a few things are implicit. You write jobs as if they were going > >to > >execute in a single process. These get broken into stages and scheduled > >on > >the cluster. You need a pretty good understanding of this process to > >write, debug, and tune these jobs. I think it's fairly easy to > >accidentally include a reference to a large Java object in worker code and > >have it shipped across the network unintentionally. > >- All the code/objects that run on workers must be serializable. I'm > >guessing this can be quite annoying. Samza ships the jar to the worker so > >I think it should be able to run anything (JRuby for example). > >- The driver process is a single point of failure, the only place were > >some > >job state is saved. > >- Seems operationally risky. As far as I can tell, a slow consumer tie up > >the entire cluster. This is because there is no backpressure that stops a > >driver from producing more and more DStreams, even if consumers cannot > >keep > >up. The tuning guide suggests that everytime you make a change to a job, > >you need to tune your settings again ( > > > https://spark.apache.org/docs/latest/streaming-programming-guide.html#perf > >ormance-tuning > >). > > > >Cheers, > > > >Roger > > > > > >On Fri, Sep 5, 2014 at 4:14 PM, Chris Riccomini < > >[email protected]> wrote: > > > >> Hey Roger, > >> > >> > After thinking more about it, I don't think we can get to the > >> >deterministic behavior we talked about until Kafka supports idempotent > >> >producers. > >> > >> Yes, I agree. To get fully deterministic behavior, you do need > >>idempotent > >> producers + deterministic message ordering (or Kafka transactionality) > >>at > >> the Kafka level. We plan on relying on Kafka transactionality when the > >> patch is committed. Without this, it's possible to output two different > >> "correct" answers (for my definition of correct), or two of the same > >> "correct" answers (for your definition of correct). > >> > >> > I created a JIRA ticket in case it helps spur action or keep the > >> >conversation from getting lost. > >> > >> Awesome, thanks! > >> > >> Out of curiosity, what are you using Samza for? You seem to have quite a > >> deep understanding of it. :) > >> > >> Cheers, > >> Chris > >> > >> On 9/4/14 10:06 AM, "Roger Hoover" <[email protected]> wrote: > >> > >> >Chris, Chinmay, > >> > > >> >After thinking more about it, I don't think we can get to the > >> >deterministic > >> >behavior we talked about until Kafka supports idempotent producers. > >>The > >> >reason is that duplicate messages mean that we can't rely on strong > >> >ordering. If a duplicate of update U1 can show up anytime, then we can > >> >never rely on ordered updates because we might see U1 then U2 then U1 > >> >again. I guess we could try to handle this at the application layer > >>but > >> >not at the Samza layer yet, I think. > >> > > >> >Nonetheless, some of the changes we discussed may help get closer and > >> >still > >> >make sense to implement. I created a JIRA ticket in case it helps spur > >> >action or keep the conversation from getting lost. If you don't find > >>the > >> >ticket useful at this time, feel free to close it. > >> > > >> >https://issues.apache.org/jira/browse/SAMZA-405 > >> > > >> >Cheers, > >> > > >> >Roger > >> > > >> > > >> >On Wed, Sep 3, 2014 at 11:26 AM, Chinmay Soman > >> ><[email protected]> > >> >wrote: > >> > > >> >> > bootstrapping is ever necessary on recovery or rewind. Seems like > >> >>it's > >> >> only needed for cold start. > >> >> > >> >> I think you're right. Either ways, it looks like there should be > >> >> additional support for this. > >> >> ________________________________________ > >> >> From: Roger Hoover [[email protected]] > >> >> Sent: Wednesday, September 03, 2014 9:43 AM > >> >> To: [email protected] > >> >> Subject: Re: Trying to achieve deterministic behavior on > >>recovery/rewind > >> >> > >> >> Chinmay, > >> >> > >> >> Thank you for the feedback. Responses inline. > >> >> > >> >> On Tue, Sep 2, 2014 at 2:09 PM, Chinmay Soman > >> >><[email protected] > >> >> > > >> >> wrote: > >> >> > >> >> > That's interesting ! > >> >> > > >> >> > 1) Deterministic reading from a bootstrap stream: > >> >> > We could define a changelog for the local state (which in turn is > >> >> > populated using a bootstrap stream). If the job fails at this > >>point, > >> >> > ideally, it should be restored using a changelog stream (instead of > >> >> > bootstrapping again) in order for the job to be deterministic (as > >>you > >> >> > suggest). Thus there could be a check which either selects the > >> >>bootstrap > >> >> > mode or the changelog restore mode (depending on whether a > >>changelog > >> >> > exists). I'm not sure if this check exists today (I would guess > >>no). > >> >> > > >> >> > >> >> Yes, I was wondering about this for event-table join. If you're > >>only > >> >> storing the join table in the local store then the change log stream > >>and > >> >> the bootstrap stream are duplicates of each other. One of them is > >> >> unnecessary unless you add additional state to the local store. In > >>any > >> >> case, I'm wondering if bootstrapping is ever necessary on recovery or > >> >> rewind. Seems like it's only needed for cold start. > >> >> > >> >> > >> >> > > >> >> > 2) Deterministic changelog: > >> >> > You're right - there could be a (smallish) window where we > >>re-process > >> >> some > >> >> > of the input records on a container restart. This can happen since > >>the > >> >> > changelog can be (a little ahead) of the last checkpoint for a > >>given > >> >> input > >> >> > stream. However, I would argue the changelog is still > >>deterministic in > >> >> this > >> >> > case. Since currently Samza only guarantees at-least-once > >>semantics, > >> >>this > >> >> > seems to be OK > >> >> > >> >> > >> >> > >> >> Hmmm...seems like this will violate the correctness definition that > >> >>Chris > >> >> outlined where there may be two or more "correct" choices but the > >>system > >> >> would guarantee that only one will be produced. But now that you > >> >>mention > >> >> it, I don't know if that can ever been guaranteed with an > >>at-least-once > >> >> system. If a task can always see duplicates, it may process the > >>first > >> >>with > >> >> it's local state in state S1 then modify it's state to S2 and then > >> >>process > >> >> the duplicate. > >> >> > >> >> > >> >> > >> >> > > >> >> > 3) Deterministic MessageChooser: > >> >> > The in-order problem could be avoided, if we restore the state from > >> >>its > >> >> > changelog - which was originally populated by a 'bootstrap stream'. > >> >>The > >> >> > task can then just pick up from where it left off (making the > >>system > >> >> > deterministic). Having said that, there might be value in writing > >>an > >> >> > 'EarliestFirstChooser'. > >> >> > > >> >> > >> >> Agreed. > >> >> > >> >> > > >> >> > Again, this is just my perception (which could be wrong - I'm still > >> >> > learning). > >> >> > C > >> >> > > >> >> > ________________________________________ > >> >> > From: Roger Hoover [[email protected]] > >> >> > Sent: Tuesday, September 02, 2014 8:52 AM > >> >> > To: [email protected] > >> >> > Subject: Trying to achieve deterministic behavior on > >>recovery/rewind > >> >> > > >> >> > Hi Samza devs, > >> >> > > >> >> > I think this project has the best documentation I've even seen! > >> >>Amazing > >> >> > job. It's extremely well written and Hello Samza is a really great > >> >> example > >> >> > that I was able to run + modify without issue. It was a joy > >>reading > >> >>the > >> >> > docs and playing around with example. Kudos! > >> >> > > >> >> > After thoroughly reading all the docs, I still have a few questions > >> >>and > >> >> > would appreciate any feedback. > >> >> > > >> >> > I was thinking about how to support deterministic behavior on > >> >>recovery or > >> >> > rewind. Maybe it can't always be 100% deterministic but I think we > >> >>can > >> >> get > >> >> > close. Have other people thought about this? Is it desirable? > >> >> > > >> >> > For example, let's say we're joining two streams: orders and > >> >>user_info. > >> >> As > >> >> > orders come in, we use the user_id field of the order to lookup > >> >> additional > >> >> > information about the user and enrich the stream. Say we're > >>keeping > >> >>all > >> >> > the user_info state in the local KV store. > >> >> > > >> >> > t1: User updates her email to "[email protected]" > >> >> > t2: User buys a pair of jeans (product_id == 99) > >> >> > t3: User updates her email to "[email protected]" > >> >> > > >> >> > In the case of normal operation (no delays in the user_info > >>stream), > >> >>the > >> >> > enriched record will be: > >> >> > > >> >> > {product_id: 99, email: "[email protected]", ...} > >> >> > > >> >> > But say that our job fails before it can checkpoint and is > >>configured > >> >>to > >> >> > bootstrap from user_info. When it gets restarted and bootstraps > >>from > >> >>the > >> >> > user_info stream, it will end up with the email set to > >>"[email protected]" > >> >>in > >> >> > the local KV store. Then it will reprocess the order event and > >> >>produce > >> >> the > >> >> > "wrong" output: > >> >> > > >> >> > {product_id: 99, email: "[email protected]", ...} > >> >> > > >> >> > I haven't verified that but the documentation says "a bootstrap > >>stream > >> >> > waits for the consumer to explicitly confirm that the stream has > >>been > >> >> fully > >> >> > consumed." Shouldn't it wait until it's consumed up the the > >> >>checkpoint > >> >> > offset for the bootsrap stream instead (when there is saved > >>checkpoint > >> >> > offset)? > >> >> > > >> >> > Likewise, for local state replicated in the change log. During the > >> >> > checkpoint process, Samza could include it's producer offset in the > >> >> > checkpoint data so that during recovery, the local state will be > >> >>restored > >> >> > to a state that corresponds with it's offsets for the input > >>streams. > >> >> > Everything would be coherent rather than having the input streams > >> >> restored > >> >> > to checkpoint and local state restored to most recent value. I'm > >> >> assuming > >> >> > that change log commits for local state and checkpoint are done > >> >>together > >> >> in > >> >> > an atomic transaction so that they may not always match. > >> >> > > >> >> > The other missing piece is a nearly deterministic MessageChooser. > >> >>During > >> >> > recovery + rewind, all the messages in both streams are already > >> >>present > >> >> in > >> >> > Kafka and we want a way to replay them in the same order as if they > >> >>were > >> >> > played in real-time. The only way to approximate this behavior > >>that I > >> >> can > >> >> > see is to use Kafka broker timestamps for each message. Is it > >> >>possible > >> >> to > >> >> > write an "EarliestFirstChooser" that always chooses the oldest > >>message > >> >> > available according to the timestamp it was received by the Kafka > >> >>broker? > >> >> > > >> >> > I don't know if Kafka stores a timestamp with each message but I'm > >> >> assuming > >> >> > it does because it supports an API on the simple consumer called > >> >> > getOffsetsBefore() that would seem to map from timestamps to > >>offsets. > >> >> > > >> >> > Finally, a nit pick. I'm using Samza 0.7.0 but the metrics data > >>has > >> >>the > >> >> > version as {"samza-version": "0.0.1"}. Is this intentional? > >> >> > > >> >> > If it makes sense, I can put in some JIRA tickets for this stuff... > >> >> > > >> >> > Cheers, > >> >> > > >> >> > Roger > >> >> > > >> >> > >> > >> > > -- Dan Di Spaltro
