Hey Yi,

What does the JobCoordinator do? YARN/Mesos/etc would be doing the actual
resource assignment, process restart, etc, right? Is the additional value
add of the JobCoordinator just partition management?

-Jay

On Thu, Jul 2, 2015 at 11:32 AM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, all,
>
>
> Thanks Chris for sending out this proposal and Jay for sharing the
> extremely illustrative prototype code.
>
>
> I have been thinking it over many times and want to list out my personal
> opinions below:
>
> 1. Generally, I agree with most of the people here on the mailing list on
> two points:
>
>    a. Deeper integration w/ Kafka is great. No more confusing mapping from
> SystemStreamPartition to TopicPartition etc.
>
>    b. Separation the ingestion vs transformation greatly simplify the
> systems APIs
>
> Having the above two changes would allow us to remove many unnecessary
> complexities introduced by those pluggable interfaces Chris’ pointed out,
> e.g. pluggable streaming systems and serde.
>
>
> To recall one of Chris’s statement on difficulties in dynamic deployment, I
> believe that the difficulties are mainly the result of tight-coupling of
> partition assignment vs the container deployment in the current system. The
> current container deployment requires a pre-defined partition assignment
> strategy coupled together w/ the deployment configuration before we can
> submit to YARN and start the Samza container, which makes the launching
> process super long. Also, fault-tolerance and the embedded JobCoordinator
> code in YARN AppMaster is another way of  making dynamic deployment more
> complex and difficult.
>
>
> First, borrowing Yan’s term, let’s call the Samza standalone process a
> Samza worker. Here is what I have been thinking:
>
> 1. Separate the execution framework from partition assignment/load
> balancing:
>
>     a. a Samza worker should be launched by execution framework that only
> deals w/ process placement to available nodes. The execution framework now
> should only deal w/ how many such processes are needed, where to put them,
> and how to keep them alive.
>
>     b. Partition assignment/load balancing can be a pluggable interface in
> Samza that allows the Samza workers to ask for partition assignments. Let’s
> borrow the name JobCoordinator for now. To allow fault-tolerance in case of
> failure, the partition assignments to workers need to be dynamic. Hence,
> the abstract interface would be much like what Jay’s code illustrate:
> get()/onAssigned()/onRevoke(). The implementation of the partition
> assignment can be either:
>
>         a) completely rely on Kafka.
>
>         b) explicit partition assignment via JobCoordinator. Chris’s work
> in SAMZA-516 can be easily incorporated here. The use case in SAMZA-41 that
> runs Samza ProcessJob w/ static partition assignment can be implemented of
> JobCoordinator via any home-grown implementation of distributed
> coordinator. All the work we did in LinkedIn to support dynamic partition
> assignment and host-affinity SAMZA-617 can be nicely reused as an
> implementation of JobCoordinator.
>
>
> When we did the above work, I can see three usage patterns in Samza:
>
>    a. Samza as a library: Samza has a set of libraries to provide stream
> processing, just like a third Kafka client (as illustrated in Jay’s
> example). The execution/deployment is totally controlled by the application
> and the partition coordination is done via Kafka
>
>    b. Samza as a process: Samza runs as a standalone process. There may not
> be a execution framework to launch and deploy Samza processes. The
> partition assignment is pluggable JobCoordinator.
>
>    c. Samza as a service: Samza runs as a collection of processes. There
> will be an execution framework to allocate resource, launch and deploy
> Samza workers and keep them alive. The same pluggable JobCoordinator is
> desirable here as well.
>
>
> Lastly, I would argue that CopyCat in KIP-26 should probably follow the
> same model. Hence, in Samza as a service model as in LinkedIn, we can use
> the same fault tolerance execution framework to run CopyCat and Samza w/o
> the need to operate two service platforms, which should address Sriram’s
> comment in the email thread.
>
>
> Hope the above makes sense. Thanks all!
>
>
> -Yi
>
> On Thu, Jul 2, 2015 at 9:53 AM, Sriram <sriram....@gmail.com> wrote:
>
> > One thing that is worth exploring is to have a transformation and
> > ingestion library in Kafka but use the same framework for fault
> tolerance,
> > resource isolation and management. The biggest difference I see in these
> > two use cases is the API and data model.
> >
> >
> > > On Jul 2, 2015, at 8:59 AM, Jay Kreps <j...@confluent.io> wrote:
> > >
> > > Hey Garry,
> > >
> > > Yeah that's super frustrating. I'd be happy to chat more about this if
> > > you'd be interested. I think Chris and I started with the idea of "what
> > > would it take to make Samza a kick-ass ingestion tool" but ultimately
> we
> > > kind of came around to the idea that ingestion and transformation had
> > > pretty different needs and coupling the two made things hard.
> > >
> > > For what it's worth I think copycat (KIP-26) actually will do what you
> > are
> > > looking for.
> > >
> > > With regard to your point about slider, I don't necessarily disagree.
> > But I
> > > think getting good YARN support is quite doable and I think we can make
> > > that work well. I think the issue this proposal solves is that
> > technically
> > > it is pretty hard to support multiple cluster management systems the
> way
> > > things are now, you need to write an "app master" or "framework" for
> each
> > > and they are all a little different so testing is really hard. In the
> > > absence of this we have been stuck with just YARN which has fantastic
> > > penetration in the Hadoopy part of the org, but zero penetration
> > elsewhere.
> > > Given the huge amount of work being put in to slider, marathon, aws
> > > tooling, not to mention the umpteen related packaging technologies
> people
> > > want to use (Docker, Kubernetes, various cloud-specific deploy tools,
> > etc)
> > > I really think it is important to get this right.
> > >
> > > -Jay
> > >
> > > On Thu, Jul 2, 2015 at 4:17 AM, Garry Turkington <
> > > g.turking...@improvedigital.com> wrote:
> > >
> > >> Hi all,
> > >>
> > >> I think the question below re does Samza become a sub-project of Kafka
> > >> highlights the broader point around migration. Chris mentions Samza's
> > >> maturity is heading towards a v1 release but I'm not sure it feels
> > right to
> > >> launch a v1 then immediately plan to deprecate most of it.
> > >>
> > >> From a selfish perspective I have some guys who have started working
> > with
> > >> Samza and building some new consumers/producers was next up. Sounds
> like
> > >> that is absolutely not the direction to go. I need to look into the
> KIP
> > in
> > >> more detail but for me the attractiveness of adding new Samza
> > >> consumer/producers -- even if yes all they were doing was really
> getting
> > >> data into and out of Kafka --  was to avoid  having to worry about the
> > >> lifecycle management of external clients. If there is a generic Kafka
> > >> ingress/egress layer that I can plug a new connector into and have a
> > lot of
> > >> the heavy lifting re scale and reliability done for me then it gives
> me
> > all
> > >> the pushing new consumers/producers would. If not then it complicates
> my
> > >> operational deployments.
> > >>
> > >> Which is similar to my other question with the proposal -- if we
> build a
> > >> fully available/stand-alone Samza plus the requisite shims to
> integrate
> > >> with Slider etc I suspect the former may be a lot more work than we
> > think.
> > >> We may make it much easier for a newcomer to get something running but
> > >> having them step up and get a reliable production deployment may still
> > >> dominate mailing list  traffic, if for different reasons than today.
> > >>
> > >> Don't get me wrong -- I'm comfortable with making the Samza dependency
> > on
> > >> Kafka much more explicit and I absolutely see the benefits  in the
> > >> reduction of duplication and clashing terminologies/abstractions that
> > >> Chris/Jay describe. Samza as a library would likely be a very nice
> tool
> > to
> > >> add to the Kafka ecosystem. I just have the concerns above re the
> > >> operational side.
> > >>
> > >> Garry
> > >>
> > >> -----Original Message-----
> > >> From: Gianmarco De Francisci Morales [mailto:g...@apache.org]
> > >> Sent: 02 July 2015 12:56
> > >> To: dev@samza.apache.org
> > >> Subject: Re: Thoughts and obesrvations on Samza
> > >>
> > >> Very interesting thoughts.
> > >> From outside, I have always perceived Samza as a computing layer over
> > >> Kafka.
> > >>
> > >> The question, maybe a bit provocative, is "should Samza be a
> sub-project
> > >> of Kafka then?"
> > >> Or does it make sense to keep it as a separate project with a separate
> > >> governance?
> > >>
> > >> Cheers,
> > >>
> > >> --
> > >> Gianmarco
> > >>
> > >>> On 2 July 2015 at 08:59, Yan Fang <yanfang...@gmail.com> wrote:
> > >>>
> > >>> Overall, I agree to couple with Kafka more tightly. Because Samza de
> > >>> facto is based on Kafka, and it should leverage what Kafka has. At
> the
> > >>> same time, Kafka does not need to reinvent what Samza already has. I
> > >>> also like the idea of separating the ingestion and transformation.
> > >>>
> > >>> But it is a little difficult for me to image how the Samza will look
> > >> like.
> > >>> And I feel Chris and Jay have a little difference in terms of how
> > >>> Samza should look like.
> > >>>
> > >>> *** Will it look like what Jay's code shows (A client of Kakfa) ? And
> > >>> user's application code calls this client?
> > >>>
> > >>> 1. If we make Samza be a library of Kafka (like what the code shows),
> > >>> how do we implement auto-balance and fault-tolerance? Are they taken
> > >>> care by the Kafka broker or other mechanism, such as "Samza worker"
> > >>> (just make up the name) ?
> > >>>
> > >>> 2. What about other features, such as auto-scaling, shared state,
> > >>> monitoring?
> > >>>
> > >>>
> > >>> *** If we have Samza standalone, (is this what Chris suggests?)
> > >>>
> > >>> 1. we still need to ingest data from Kakfa and produce to it. Then it
> > >>> becomes the same as what Samza looks like now, except it does not
> rely
> > >>> on Yarn anymore.
> > >>>
> > >>> 2. if it is standalone, how can it leverage Kafka's metrics, logs,
> > >>> etc? Use Kafka code as the dependency?
> > >>>
> > >>>
> > >>> Thanks,
> > >>>
> > >>> Fang, Yan
> > >>> yanfang...@gmail.com
> > >>>
> > >>>> On Wed, Jul 1, 2015 at 5:46 PM, Guozhang Wang <wangg...@gmail.com>
> > >>> wrote:
> > >>>
> > >>>> Read through the code example and it looks good to me. A few
> > >>>> thoughts regarding deployment:
> > >>>>
> > >>>> Today Samza deploys as executable runnable like:
> > >>>>
> > >>>> deploy/samza/bin/run-job.sh --config-factory=...
> > >> --config-path=file://...
> > >>>>
> > >>>> And this proposal advocate for deploying Samza more as embedded
> > >>>> libraries in user application code (ignoring the terminology since
> > >>>> it is not the
> > >>> same
> > >>>> as the prototype code):
> > >>>>
> > >>>> StreamTask task = new MyStreamTask(configs); Thread thread = new
> > >>>> Thread(task); thread.start();
> > >>>>
> > >>>> I think both of these deployment modes are important for different
> > >>>> types
> > >>> of
> > >>>> users. That said, I think making Samza purely standalone is still
> > >>>> sufficient for either runnable or library modes.
> > >>>>
> > >>>> Guozhang
> > >>>>
> > >>>>> On Tue, Jun 30, 2015 at 11:33 PM, Jay Kreps <j...@confluent.io>
> > wrote:
> > >>>>>
> > >>>>> Looks like gmail mangled the code example, it was supposed to look
> > >>>>> like
> > >>>>> this:
> > >>>>>
> > >>>>> Properties props = new Properties();
> > >>>>> props.put("bootstrap.servers", "localhost:4242"); StreamingConfig
> > >>>>> config = new StreamingConfig(props);
> > >>>>> config.subscribe("test-topic-1", "test-topic-2");
> > >>>>> config.processor(ExampleStreamProcessor.class);
> > >>>>> config.serialization(new StringSerializer(), new
> > >>>>> StringDeserializer()); KafkaStreaming container = new
> > >>>>> KafkaStreaming(config); container.run();
> > >>>>>
> > >>>>> -Jay
> > >>>>>
> > >>>>> On Tue, Jun 30, 2015 at 11:32 PM, Jay Kreps <j...@confluent.io>
> > >> wrote:
> > >>>>>
> > >>>>>> Hey guys,
> > >>>>>>
> > >>>>>> This came out of some conversations Chris and I were having
> > >>>>>> around
> > >>>>> whether
> > >>>>>> it would make sense to use Samza as a kind of data ingestion
> > >>> framework
> > >>>>> for
> > >>>>>> Kafka (which ultimately lead to KIP-26 "copycat"). This kind of
> > >>>> combined
> > >>>>>> with complaints around config and YARN and the discussion around
> > >>>>>> how
> > >>> to
> > >>>>>> best do a standalone mode.
> > >>>>>>
> > >>>>>> So the thought experiment was, given that Samza was basically
> > >>>>>> already totally Kafka specific, what if you just embraced that
> > >>>>>> and turned it
> > >>>> into
> > >>>>>> something less like a heavyweight framework and more like a
> > >>>>>> third
> > >>> Kafka
> > >>>>>> client--a kind of "producing consumer" with state management
> > >>>> facilities.
> > >>>>>> Basically a library. Instead of a complex stream processing
> > >>>>>> framework
> > >>>>> this
> > >>>>>> would actually be a very simple thing, not much more complicated
> > >>>>>> to
> > >>> use
> > >>>>> or
> > >>>>>> operate than a Kafka consumer. As Chris said we thought about it
> > >>>>>> a
> > >>> lot
> > >>>> of
> > >>>>>> what Samza (and the other stream processing systems were doing)
> > >>> seemed
> > >>>>> like
> > >>>>>> kind of a hangover from MapReduce.
> > >>>>>>
> > >>>>>> Of course you need to ingest/output data to and from the stream
> > >>>>>> processing. But when we actually looked into how that would
> > >>>>>> work,
> > >>> Samza
> > >>>>>> isn't really an ideal data ingestion framework for a bunch of
> > >>> reasons.
> > >>>> To
> > >>>>>> really do that right you need a pretty different internal data
> > >>>>>> model
> > >>>> and
> > >>>>>> set of apis. So what if you split them and had an api for Kafka
> > >>>>>> ingress/egress (copycat AKA KIP-26) and a separate api for Kafka
> > >>>>>> transformation (Samza).
> > >>>>>>
> > >>>>>> This would also allow really embracing the same terminology and
> > >>>>>> conventions. One complaint about the current state is that the
> > >>>>>> two
> > >>>>> systems
> > >>>>>> kind of feel bolted on. Terminology like "stream" vs "topic" and
> > >>>>> different
> > >>>>>> config and monitoring systems means you kind of have to learn
> > >>>>>> Kafka's
> > >>>>> way,
> > >>>>>> then learn Samza's slightly different way, then kind of
> > >>>>>> understand
> > >>> how
> > >>>>> they
> > >>>>>> map to each other, which having walked a few people through this
> > >>>>>> is surprisingly tricky for folks to get.
> > >>>>>>
> > >>>>>> Since I have been spending a lot of time on airplanes I hacked
> > >>>>>> up an ernest but still somewhat incomplete prototype of what
> > >>>>>> this would
> > >>> look
> > >>>>>> like. This is just unceremoniously dumped into Kafka as it
> > >>>>>> required a
> > >>>> few
> > >>>>>> changes to the new consumer. Here is the code:
> > >>>
> https://github.com/jkreps/kafka/tree/streams/clients/src/main/java/org
> > >>> /apache/kafka/clients/streaming
> > >>>>>>
> > >>>>>> For the purpose of the prototype I just liberally renamed
> > >>>>>> everything
> > >>> to
> > >>>>>> try to align it with Kafka with no regard for compatibility.
> > >>>>>>
> > >>>>>> To use this would be something like this:
> > >>>>>> Properties props = new Properties();
> > >>>>>> props.put("bootstrap.servers", "localhost:4242");
> > >>>>>> StreamingConfig config = new
> > >>> StreamingConfig(props);
> > >>>>> config.subscribe("test-topic-1",
> > >>>>>> "test-topic-2"); config.processor(ExampleStreamProcessor.class);
> > >>>>> config.serialization(new
> > >>>>>> StringSerializer(), new StringDeserializer()); KafkaStreaming
> > >>>> container =
> > >>>>>> new KafkaStreaming(config); container.run();
> > >>>>>>
> > >>>>>> KafkaStreaming is basically the SamzaContainer; StreamProcessor
> > >>>>>> is basically StreamTask.
> > >>>>>>
> > >>>>>> So rather than putting all the class names in a file and then
> > >>>>>> having
> > >>>> the
> > >>>>>> job assembled by reflection, you just instantiate the container
> > >>>>>> programmatically. Work is balanced over however many instances
> > >>>>>> of
> > >>> this
> > >>>>> are
> > >>>>>> alive at any time (i.e. if an instance dies, new tasks are added
> > >>>>>> to
> > >>> the
> > >>>>>> existing containers without shutting them down).
> > >>>>>>
> > >>>>>> We would provide some glue for running this stuff in YARN via
> > >>>>>> Slider, Mesos via Marathon, and AWS using some of their tools
> > >>>>>> but from the
> > >>>> point
> > >>>>> of
> > >>>>>> view of these frameworks these stream processing jobs are just
> > >>>> stateless
> > >>>>>> services that can come and go and expand and contract at will.
> > >>>>>> There
> > >>> is
> > >>>>> no
> > >>>>>> more custom scheduler.
> > >>>>>>
> > >>>>>> Here are some relevant details:
> > >>>>>>
> > >>>>>>   1. It is only ~1300 lines of code, it would get larger if we
> > >>>>>>   productionized but not vastly larger. We really do get a ton
> > >>>>>> of
> > >>>>> leverage
> > >>>>>>   out of Kafka.
> > >>>>>>   2. Partition management is fully delegated to the new consumer.
> > >>> This
> > >>>>>>   is nice since now any partition management strategy available
> > >>>>>> to
> > >>>> Kafka
> > >>>>>>   consumer is also available to Samza (and vice versa) and with
> > >>>>>> the
> > >>>>> exact
> > >>>>>>   same configs.
> > >>>>>>   3. It supports state as well as state reuse
> > >>>>>>
> > >>>>>> Anyhow take a look, hopefully it is thought provoking.
> > >>>>>>
> > >>>>>> -Jay
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Tue, Jun 30, 2015 at 6:55 PM, Chris Riccomini <
> > >>>> criccom...@apache.org>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hey all,
> > >>>>>>>
> > >>>>>>> I have had some discussions with Samza engineers at LinkedIn
> > >>>>>>> and
> > >>>>> Confluent
> > >>>>>>> and we came up with a few observations and would like to
> > >>>>>>> propose
> > >>> some
> > >>>>>>> changes.
> > >>>>>>>
> > >>>>>>> We've observed some things that I want to call out about
> > >>>>>>> Samza's
> > >>>> design,
> > >>>>>>> and I'd like to propose some changes.
> > >>>>>>>
> > >>>>>>> * Samza is dependent upon a dynamic deployment system.
> > >>>>>>> * Samza is too pluggable.
> > >>>>>>> * Samza's SystemConsumer/SystemProducer and Kafka's consumer
> > >>>>>>> APIs
> > >>> are
> > >>>>>>> trying to solve a lot of the same problems.
> > >>>>>>>
> > >>>>>>> All three of these issues are related, but I'll address them in
> > >>> order.
> > >>>>>>>
> > >>>>>>> Deployment
> > >>>>>>>
> > >>>>>>> Samza strongly depends on the use of a dynamic deployment
> > >>>>>>> scheduler
> > >>>> such
> > >>>>>>> as
> > >>>>>>> YARN, Mesos, etc. When we initially built Samza, we bet that
> > >>>>>>> there
> > >>>> would
> > >>>>>>> be
> > >>>>>>> one or two winners in this area, and we could support them, and
> > >>>>>>> the
> > >>>> rest
> > >>>>>>> would go away. In reality, there are many variations.
> > >>>>>>> Furthermore,
> > >>>> many
> > >>>>>>> people still prefer to just start their processors like normal
> > >>>>>>> Java processes, and use traditional deployment scripts such as
> > >>>>>>> Fabric,
> > >>>> Chef,
> > >>>>>>> Ansible, etc. Forcing a deployment system on users makes the
> > >>>>>>> Samza start-up process really painful for first time users.
> > >>>>>>>
> > >>>>>>> Dynamic deployment as a requirement was also a bit of a
> > >>>>>>> mis-fire
> > >>>> because
> > >>>>>>> of
> > >>>>>>> a fundamental misunderstanding between the nature of batch jobs
> > >>>>>>> and
> > >>>>> stream
> > >>>>>>> processing jobs. Early on, we made conscious effort to favor
> > >>>>>>> the
> > >>>> Hadoop
> > >>>>>>> (Map/Reduce) way of doing things, since it worked and was well
> > >>>>> understood.
> > >>>>>>> One thing that we missed was that batch jobs have a definite
> > >>>> beginning,
> > >>>>>>> and
> > >>>>>>> end, and stream processing jobs don't (usually). This leads to
> > >>>>>>> a
> > >>> much
> > >>>>>>> simpler scheduling problem for stream processors. You basically
> > >>>>>>> just
> > >>>>> need
> > >>>>>>> to find a place to start the processor, and start it. The way
> > >>>>>>> we run grids, at LinkedIn, there's no concept of a cluster
> > >>>>>>> being "full". We always
> > >>>> add
> > >>>>>>> more machines. The problem with coupling Samza with a scheduler
> > >>>>>>> is
> > >>>> that
> > >>>>>>> Samza (as a framework) now has to handle deployment. This pulls
> > >>>>>>> in a
> > >>>>> bunch
> > >>>>>>> of things such as configuration distribution (config stream),
> > >>>>>>> shell
> > >>>>> scrips
> > >>>>>>> (bin/run-job.sh, JobRunner), packaging (all the .tgz stuff), etc.
> > >>>>>>>
> > >>>>>>> Another reason for requiring dynamic deployment was to support
> > >>>>>>> data locality. If you want to have locality, you need to put
> > >>>>>>> your
> > >>>> processors
> > >>>>>>> close to the data they're processing. Upon further
> > >>>>>>> investigation,
> > >>>>> though,
> > >>>>>>> this feature is not that beneficial. There is some good
> > >>>>>>> discussion
> > >>>> about
> > >>>>>>> some problems with it on SAMZA-335. Again, we took the
> > >>>>>>> Map/Reduce
> > >>>> path,
> > >>>>>>> but
> > >>>>>>> there are some fundamental differences between HDFS and Kafka.
> > >>>>>>> HDFS
> > >>>> has
> > >>>>>>> blocks, while Kafka has partitions. This leads to less
> > >>>>>>> optimization potential with stream processors on top of Kafka.
> > >>>>>>>
> > >>>>>>> This feature is also used as a crutch. Samza doesn't have any
> > >>>>>>> built
> > >>> in
> > >>>>>>> fault-tolerance logic. Instead, it depends on the dynamic
> > >>>>>>> deployment scheduling system to handle restarts when a
> > >>>>>>> processor dies. This has
> > >>>>> made
> > >>>>>>> it very difficult to write a standalone Samza container
> > >> (SAMZA-516).
> > >>>>>>>
> > >>>>>>> Pluggability
> > >>>>>>>
> > >>>>>>> In some cases pluggability is good, but I think that we've gone
> > >>>>>>> too
> > >>>> far
> > >>>>>>> with it. Currently, Samza has:
> > >>>>>>>
> > >>>>>>> * Pluggable config.
> > >>>>>>> * Pluggable metrics.
> > >>>>>>> * Pluggable deployment systems.
> > >>>>>>> * Pluggable streaming systems (SystemConsumer, SystemProducer,
> > >> etc).
> > >>>>>>> * Pluggable serdes.
> > >>>>>>> * Pluggable storage engines.
> > >>>>>>> * Pluggable strategies for just about every component
> > >>> (MessageChooser,
> > >>>>>>> SystemStreamPartitionGrouper, ConfigRewriter, etc).
> > >>>>>>>
> > >>>>>>> There's probably more that I've forgotten, as well. Some of
> > >>>>>>> these
> > >>> are
> > >>>>>>> useful, but some have proven not to be. This all comes at a cost:
> > >>>>>>> complexity. This complexity is making it harder for our users
> > >>>>>>> to
> > >>> pick
> > >>>> up
> > >>>>>>> and use Samza out of the box. It also makes it difficult for
> > >>>>>>> Samza developers to reason about what the characteristics of
> > >>>>>>> the container (since the characteristics change depending on
> > >>>>>>> which plugins are use).
> > >>>>>>>
> > >>>>>>> The issues with pluggability are most visible in the System APIs.
> > >>> What
> > >>>>>>> Samza really requires to be functional is Kafka as its
> > >>>>>>> transport
> > >>>> layer.
> > >>>>>>> But
> > >>>>>>> we've conflated two unrelated use cases into one API:
> > >>>>>>>
> > >>>>>>> 1. Get data into/out of Kafka.
> > >>>>>>> 2. Process the data in Kafka.
> > >>>>>>>
> > >>>>>>> The current System API supports both of these use cases. The
> > >>>>>>> problem
> > >>>> is,
> > >>>>>>> we
> > >>>>>>> actually want different features for each use case. By papering
> > >>>>>>> over
> > >>>>> these
> > >>>>>>> two use cases, and providing a single API, we've introduced a
> > >>>>>>> ton of
> > >>>>> leaky
> > >>>>>>> abstractions.
> > >>>>>>>
> > >>>>>>> For example, what we'd really like in (2) is to have
> > >>>>>>> monotonically increasing longs for offsets (like Kafka). This
> > >>>>>>> would be at odds
> > >>> with
> > >>>>> (1),
> > >>>>>>> though, since different systems have different
> > >>>>> SCNs/Offsets/UUIDs/vectors.
> > >>>>>>> There was discussion both on the mailing list and the SQL JIRAs
> > >>> about
> > >>>>> the
> > >>>>>>> need for this.
> > >>>>>>>
> > >>>>>>> The same thing holds true for replayability. Kafka allows us to
> > >>> rewind
> > >>>>>>> when
> > >>>>>>> we have a failure. Many other systems don't. In some cases,
> > >>>>>>> systems
> > >>>>> return
> > >>>>>>> null for their offsets (e.g. WikipediaSystemConsumer) because
> > >>>>>>> they
> > >>>> have
> > >>>>> no
> > >>>>>>> offsets.
> > >>>>>>>
> > >>>>>>> Partitioning is another example. Kafka supports partitioning,
> > >>>>>>> but
> > >>> many
> > >>>>>>> systems don't. We model this by having a single partition for
> > >>>>>>> those systems. Still, other systems model partitioning
> > >> differently (e.g.
> > >>>>>>> Kinesis).
> > >>>>>>>
> > >>>>>>> The SystemAdmin interface is also a mess. Creating streams in a
> > >>>>>>> system-agnostic way is almost impossible. As is modeling
> > >>>>>>> metadata
> > >>> for
> > >>>>> the
> > >>>>>>> system (replication factor, partitions, location, etc). The
> > >>>>>>> list
> > >>> goes
> > >>>>> on.
> > >>>>>>>
> > >>>>>>> Duplicate work
> > >>>>>>>
> > >>>>>>> At the time that we began writing Samza, Kafka's consumer and
> > >>> producer
> > >>>>>>> APIs
> > >>>>>>> had a relatively weak feature set. On the consumer-side, you
> > >>>>>>> had two
> > >>>>>>> options: use the high level consumer, or the simple consumer.
> > >>>>>>> The
> > >>>>> problem
> > >>>>>>> with the high-level consumer was that it controlled your
> > >>>>>>> offsets, partition assignments, and the order in which you
> > >>>>>>> received messages. The
> > >>> problem
> > >>>>>>> with
> > >>>>>>> the simple consumer is that it's not simple. It's basic. You
> > >>>>>>> end up
> > >>>>> having
> > >>>>>>> to handle a lot of really low-level stuff that you shouldn't.
> > >>>>>>> We
> > >>>> spent a
> > >>>>>>> lot of time to make Samza's KafkaSystemConsumer very robust. It
> > >>>>>>> also allows us to support some cool features:
> > >>>>>>>
> > >>>>>>> * Per-partition message ordering and prioritization.
> > >>>>>>> * Tight control over partition assignment to support joins,
> > >>>>>>> global
> > >>>> state
> > >>>>>>> (if we want to implement it :)), etc.
> > >>>>>>> * Tight control over offset checkpointing.
> > >>>>>>>
> > >>>>>>> What we didn't realize at the time is that these features
> > >>>>>>> should
> > >>>>> actually
> > >>>>>>> be in Kafka. A lot of Kafka consumers (not just Samza stream
> > >>>> processors)
> > >>>>>>> end up wanting to do things like joins and partition
> > >>>>>>> assignment. The
> > >>>>> Kafka
> > >>>>>>> community has come to the same conclusion. They're adding a ton
> > >>>>>>> of upgrades into their new Kafka consumer implementation. To a
> > >>>>>>> large extent,
> > >>> it's
> > >>>>>>> duplicate work to what we've already done in Samza.
> > >>>>>>>
> > >>>>>>> On top of this, Kafka ended up taking a very similar approach
> > >>>>>>> to
> > >>>> Samza's
> > >>>>>>> KafkaCheckpointManager implementation for handling offset
> > >>>> checkpointing.
> > >>>>>>> Like Samza, Kafka's new offset management feature stores offset
> > >>>>>>> checkpoints in a topic, and allows you to fetch them from the
> > >>>>>>> broker.
> > >>>>>>>
> > >>>>>>> A lot of this seems like a waste, since we could have shared
> > >>>>>>> the
> > >>> work
> > >>>> if
> > >>>>>>> it
> > >>>>>>> had been done in Kafka from the get-go.
> > >>>>>>>
> > >>>>>>> Vision
> > >>>>>>>
> > >>>>>>> All of this leads me to a rather radical proposal. Samza is
> > >>> relatively
> > >>>>>>> stable at this point. I'd venture to say that we're near a 1.0
> > >>>> release.
> > >>>>>>> I'd
> > >>>>>>> like to propose that we take what we've learned, and begin
> > >>>>>>> thinking
> > >>>>> about
> > >>>>>>> Samza beyond 1.0. What would we change if we were starting from
> > >>>> scratch?
> > >>>>>>> My
> > >>>>>>> proposal is to:
> > >>>>>>>
> > >>>>>>> 1. Make Samza standalone the *only* way to run Samza
> > >>>>>>> processors, and eliminate all direct dependences on YARN, Mesos,
> > >> etc.
> > >>>>>>> 2. Make a definitive call to support only Kafka as the stream
> > >>>> processing
> > >>>>>>> layer.
> > >>>>>>> 3. Eliminate Samza's metrics, logging, serialization, and
> > >>>>>>> config
> > >>>>> systems,
> > >>>>>>> and simply use Kafka's instead.
> > >>>>>>>
> > >>>>>>> This would fix all of the issues that I outlined above. It
> > >>>>>>> should
> > >>> also
> > >>>>>>> shrink the Samza code base pretty dramatically. Supporting only
> > >>>>>>> a standalone container will allow Samza to be executed on YARN
> > >>>>>>> (using Slider), Mesos (using Marathon/Aurora), or most other
> > >>>>>>> in-house
> > >>>>> deployment
> > >>>>>>> systems. This should make life a lot easier for new users.
> > >>>>>>> Imagine
> > >>>>> having
> > >>>>>>> the hello-samza tutorial without YARN. The drop in mailing list
> > >>>> traffic
> > >>>>>>> will be pretty dramatic.
> > >>>>>>>
> > >>>>>>> Coupling with Kafka seems long overdue to me. The reality is,
> > >>> everyone
> > >>>>>>> that
> > >>>>>>> I'm aware of is using Samza with Kafka. We basically require it
> > >>>> already
> > >>>>> in
> > >>>>>>> order for most features to work. Those that are using other
> > >>>>>>> systems
> > >>>> are
> > >>>>>>> generally using it for ingest into Kafka (1), and then they do
> > >>>>>>> the processing on top. There is already discussion (
> > >>>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851
> > >>> 767
> > >>>>>>> )
> > >>>>>>> in Kafka to make ingesting into Kafka extremely easy.
> > >>>>>>>
> > >>>>>>> Once we make the call to couple with Kafka, we can leverage a
> > >>>>>>> ton of
> > >>>>> their
> > >>>>>>> ecosystem. We no longer have to maintain our own config,
> > >>>>>>> metrics,
> > >>> etc.
> > >>>>> We
> > >>>>>>> can all share the same libraries, and make them better. This
> > >>>>>>> will
> > >>> also
> > >>>>>>> allow us to share the consumer/producer APIs, and will let us
> > >>> leverage
> > >>>>>>> their offset management and partition management, rather than
> > >>>>>>> having
> > >>>> our
> > >>>>>>> own. All of the coordinator stream code would go away, as would
> > >>>>>>> most
> > >>>> of
> > >>>>>>> the
> > >>>>>>> YARN AppMaster code. We'd probably have to push some partition
> > >>>>> management
> > >>>>>>> features into the Kafka broker, but they're already moving in
> > >>>>>>> that direction with the new consumer API. The features we have
> > >>>>>>> for
> > >>>> partition
> > >>>>>>> assignment aren't unique to Samza, and seem like they should be
> > >>>>>>> in
> > >>>> Kafka
> > >>>>>>> anyway. There will always be some niche usages which will
> > >>>>>>> require
> > >>>> extra
> > >>>>>>> care and hence full control over partition assignments much
> > >>>>>>> like the
> > >>>>> Kafka
> > >>>>>>> low level consumer api. These would continue to be supported.
> > >>>>>>>
> > >>>>>>> These items will be good for the Samza community. They'll make
> > >>>>>>> Samza easier to use, and make it easier for developers to add
> > >>>>>>> new features.
> > >>>>>>>
> > >>>>>>> Obviously this is a fairly large (and somewhat backwards
> > >>> incompatible
> > >>>>>>> change). If we choose to go this route, it's important that we
> > >>> openly
> > >>>>>>> communicate how we're going to provide a migration path from
> > >>>>>>> the
> > >>>>> existing
> > >>>>>>> APIs to the new ones (if we make incompatible changes). I think
> > >>>>>>> at a minimum, we'd probably need to provide a wrapper to allow
> > >>>>>>> existing StreamTask implementations to continue running on the
> > >> new container.
> > >>>>> It's
> > >>>>>>> also important that we openly communicate about timing, and
> > >>>>>>> stages
> > >>> of
> > >>>>> the
> > >>>>>>> migration.
> > >>>>>>>
> > >>>>>>> If you made it this far, I'm sure you have opinions. :) Please
> > >>>>>>> send
> > >>>> your
> > >>>>>>> thoughts and feedback.
> > >>>>>>>
> > >>>>>>> Cheers,
> > >>>>>>> Chris
> > >>>>
> > >>>>
> > >>>>
> > >>>> --
> > >>>> -- Guozhang
> > >>
> >
>

Reply via email to