Here is the link to the original prototype we started with. I wouldn't
focus to heavily on the details of this code or the api, but I think it
gives the an idea of the lowest level api, amount of code, etc. It was
basically a clone of Samza built on Kafka using the new consumer protocol
just to explore the idea.
https://github.com/jkreps/kafka/tree/streams/clients/src/main/java/org/apache/kafka/clients/streaming

I don't think we should talk about it too much because I think Guozhang and
Yasu rewrote that, improved the apis, etc. But I think this does give a
kind of proof that it is possible to make a relatively complete stream
processing system that is only a few thousand lines of code that heavily
embraces Kafka.

-Jay

On Tue, Jul 28, 2015 at 12:57 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> I have updated the wiki page incorporating people's comments, please feel
> free to take another look before today's meeting.
>
> On Mon, Jul 27, 2015 at 11:19 PM, Yi Pan <nickpa...@gmail.com> wrote:
>
> > Hi, Jay,
> >
> > {quote}
> > 1. Yeah we are going to try to generalize the partition management stuff.
> > We'll get a wiki/JIRA up for that. I think that gives what you want in
> > terms of moving partitioning to the client side.
> > {quote}
> > Great! I am looking forward to that.
> >
> > {quote}
> > I think the key observation is that the whole reason
> > LinkedIn split data over clusters to begin with was because of the lack
> of
> > quotas, which are in any case getting implemented.
> > {quote}
> > I am not sure that I followed this point. Is your point that with quota,
> it
> > is possible to host all data in a single cluster?
> >
> > -Yi
> >
> > On Mon, Jul 27, 2015 at 8:53 AM, Jay Kreps <j...@confluent.io> wrote:
> >
> > > Hey Yi,
> > >
> > > Great points. I think for some of this the most useful thing would be
> to
> > > get a wip prototype out that we could discuss concretely. I think
> > Yasuhiro
> > > and Guozhang took that prototype I had done, and had some improvements.
> > > Give us a bit to get that into understandable shape so we can discuss.
> > >
> > > To address a few of your other points:
> > > 1. Yeah we are going to try to generalize the partition management
> stuff.
> > > We'll get a wiki/JIRA up for that. I think that gives what you want in
> > > terms of moving partitioning to the client side.
> > > 2. I think consuming from a different cluster you produce to will be
> > easy.
> > > More than that is more complex, though I agree the pluggable
> partitioning
> > > makes it theoretically possible. Let's try to get something that works
> > for
> > > the first case, it sounds like that solves the use case you describe of
> > > wanting to directly transform from a given cluster but produce back to
> a
> > > different cluster. I think the key observation is that the whole reason
> > > LinkedIn split data over clusters to begin with was because of the lack
> > of
> > > quotas, which are in any case getting implemented.
> > >
> > > -Jay
> > >
> > > On Sun, Jul 26, 2015 at 11:31 PM, Yi Pan <nickpa...@gmail.com> wrote:
> > >
> > > > Hi, Jay and all,
> > > >
> > > > Thanks for all your quick responses. I tried to summarize my thoughts
> > > here:
> > > >
> > > > - ConsumerRecord as stream processor API:
> > > >
> > > >    * This KafkaProcessor API is targeted to receive the message from
> > > Kafka.
> > > > So, to Yasuhiro's join/transformation example, any
> join/transformation
> > > > results that are materialized in Kafka should have ConsumerRecord
> > format
> > > > (i.e. w/ topic and offsets). Any non-materialized join/transformation
> > > > results should not be processed by this KafkaProcessor API. One
> example
> > > is
> > > > the in-memory operators API in Samza, which is designed to handle the
> > > > non-materialzied join/transformation results. And yes, in this case,
> a
> > > more
> > > > abstract data model is needed.
> > > >
> > > >    * Just to support Jay's point of a general
> > > > ConsumerRecord/ProducerRecord, a general stream processing on more
> than
> > > one
> > > > data sources would need at least the following info: data source
> > > > description (i.e. which topic/table), and actual data (i.e. key-value
> > > > pairs). It would make sense to have the data source name as part of
> the
> > > > general metadata in stream processing (think about it as the table
> name
> > > for
> > > > records in standard SQL).
> > > >
> > > > - SQL/DSL
> > > >
> > > >    * I think that this topic itself is worthy of another KIP
> > discussion.
> > > I
> > > > would prefer to leave it out of scope in KIP-28.
> > > >
> > > > - Client-side pluggable partition manager
> > > >
> > > >    * Given the use cases we have seen with large-scale deployment of
> > > > Samza/Kafka in LinkedIn, I would argue that we should make it as the
> > > > first-class citizen in this KIP. The use cases include:
> > > >
> > > >       * multi-cluster Kafka
> > > >
> > > >       * host-affinity (i.e. local-state associated w/ certain
> > partitions
> > > on
> > > > client)
> > > >
> > > > - Multi-cluster scenario
> > > >
> > > >    * Although I originally just brought it up as a use case that
> > requires
> > > > client-side partition manager, reading Jay’s comments, I realized
> that
> > I
> > > > have one fundamental issue w/ the current copycat + transformation
> > model.
> > > > If I interpret Jay’s comment correctly, the proposed
> > > copycat+transformation
> > > > plays out in the following way: i) copycat takes all data from
> sources
> > > (no
> > > > matter it is Kafka or non-Kafka) into *one single Kafka cluster*; ii)
> > > > transformation is only restricted to take data sources in *this
> single
> > > > Kafka cluster* to perform aggregate/join etc. This is different from
> my
> > > > original understanding of the copycat. The main issue I have with
> this
> > > > model is: huge data-copy between Kafka clusters. In LinkedIn, we used
> > to
> > > > follow this model that uses MirrorMaker to map topics from tracking
> > > > clusters to Samza-specific Kafka cluster and only do stream
> processing
> > in
> > > > the Samza-specific Kafka cluster. We moved away from this model and
> > > started
> > > > allowing users to directly consume from tracking Kafka clusters due
> to
> > > the
> > > > overhead of copying huge amount of traffic between Kafka clusters. I
> > > agree
> > > > that the initial design of KIP-28 would probably need a smaller scope
> > of
> > > > problem to solve, hence, limiting to solving partition management in
> a
> > > > single cluster. However, I would really hope the design won’t prevent
> > the
> > > > use case of processing data directly from multiple clusters. In my
> > > opinion,
> > > > making the partition manager as a client-side pluggable logic would
> > allow
> > > > us to achieve these goals.
> > > >
> > > > Thanks a lot in advance!
> > > >
> > > > -Yi
> > > >
> > > > On Fri, Jul 24, 2015 at 11:13 AM, Jay Kreps <j...@confluent.io>
> wrote:
> > > >
> > > > > Hey Yi,
> > > > >
> > > > > For your other two points:
> > > > >
> > > > > - This definitely doesn't cover any kind of SQL or anything like
> > this.
> > > > >
> > > > > - The prototype we started with just had process() as a method but
> > > > Yasuhiro
> > > > > had some ideas of adding additional filter/aggregate convenience
> > > methods.
> > > > > We should discuss how this would fit with the operator work you
> were
> > > > doing
> > > > > in Samza. Probably the best way is just get the code out there in
> > > current
> > > > > state and start talking about it?
> > > > >
> > > > > - Your point about multiple clusters. We actually have a proposed
> > > > extension
> > > > > for the Kafka group management protocol that would allow it to
> cover
> > > > > multiple clusters but actually I think that use case is not the
> > focus.
> > > I
> > > > > think in scope would be consuming from one cluster and producing to
> > > > > another.
> > > > >
> > > > > One of the assumptions we are making is that we will split into two
> > > > > categories:
> > > > > a. Ingress/egress which is handled by copycat
> > > > > b. Transformation which would be handled by this api
> > > > >
> > > > > I think there are a number of motivations for this
> > > > > - It is really hard to provide hard guarantees if you allow
> > non-trivial
> > > > > aggregation coupled with the ingress/egress. So if you want to be
> > able
> > > to
> > > > > do something that provides a kind of end-to-end "exactly once"
> > > guarantee
> > > > > (that's not really the right term but what people use) I think it
> > will
> > > be
> > > > > really hard to do this across multiple systems (hello two-phase
> > commit)
> > > > > - The APIs for ingest/egress end up needing to be really different
> > for
> > > a
> > > > > first-class ingestion framework
> > > > >
> > > > > So the case where you have data coming from many systems including
> > many
> > > > > Kafka clusters is just about how easy/hard it is to use copycat
> with
> > > the
> > > > > transformer api in the same program. I think this is something we
> > > should
> > > > > work out as part of the prototyping.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Fri, Jul 24, 2015 at 12:57 AM, Yi Pan <nickpa...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi, Guozhang,
> > > > > >
> > > > > > Thanks for starting this. I took a quick look and had the
> following
> > > > > > thoughts to share:
> > > > > >
> > > > > > - In the proposed KafkaProcessor API, there is no interface like
> > > > > Collector
> > > > > > that allows users to send messages to. Why is that? Is the idea
> to
> > > > > > initialize the producer once and re-use it in the processor? And
> if
> > > > there
> > > > > > are many KStreamThreads in the process, are there going to be
> many
> > > > > > instances of KafkaProducer although all outputs are sending to
> the
> > > same
> > > > > > Kafka cluster?
> > > > > >
> > > > > > - Won’t it be simpler if the process() API just takes in the
> > > > > ConsumerRecord
> > > > > > as the input instead of a tuple of (topic, key, value)?
> > > > > >
> > > > > > - Also, the input only indicates the topic of a message. What if
> > the
> > > > > stream
> > > > > > task needs to consume and produce messages from/to multiple Kafka
> > > > > clusters?
> > > > > > To support that case, there should be a system/cluster name in
> both
> > > > input
> > > > > > and output as well.
> > > > > >
> > > > > > - How are the output messages handled? There does not seem to
> have
> > an
> > > > > > interface that allows user to send an output messages to multiple
> > > > output
> > > > > > Kafka clusters.
> > > > > >
> > > > > > - It seems the proposed model also assumes one thread per
> > processor.
> > > > What
> > > > > > becomes thread-local and what are shared among processors? Is the
> > > > > proposed
> > > > > > model targeting to have the consumers/producers become
> thread-local
> > > > > > instances within each KafkaProcessor? What’s the cost associated
> > with
> > > > > this
> > > > > > model?
> > > > > >
> > > > > > - One more important issue: how do we plug-in client-side
> partition
> > > > > > management logic? Considering about the use case where the stream
> > > task
> > > > > > needs to consume from multiple Kafka clusters, I am not even sure
> > > that
> > > > we
> > > > > > can rely on Kafka broker to maintain the consumer group
> membership?
> > > > Maybe
> > > > > > we still can get the per cluster consumer group membership and
> > > > > partitions.
> > > > > > However, in this case, we truly need a client-side plugin
> partition
> > > > > > management logic to determine how to assign partitions in
> different
> > > > Kafka
> > > > > > clusters to consumers (i.e. consumers for cluster1.topic1.p1 and
> > > > > > cluster2.topic2.p1 has to be assigned together to one
> > KafkaProcessor
> > > > for
> > > > > > processing). Based on the full information about (group members,
> > all
> > > > > topic
> > > > > > partitions) in all Kafka clusters with input topics, there should
> > be
> > > > two
> > > > > > levels of partition management policies: a) how to group all
> topic
> > > > > > partitions in all Kafka clusters to processor groups (i.e. the
> same
> > > > > concept
> > > > > > as Task group in Samza); b) how to assign the processor groups to
> > > group
> > > > > > members. Note if a processor group includes topic partitions from
> > > more
> > > > > than
> > > > > > one Kafka clusters, it has to be assigned to the common group
> > members
> > > > in
> > > > > > all relevant Kafka clusters. This can not be done just by the
> > brokers
> > > > in
> > > > > a
> > > > > > single Kafka cluster.
> > > > > >
> > > > > > - It seems that the intention of this KIP is also trying to put
> > > SQL/DSL
> > > > > > libraries into Kafka. Why is it? Shouldn't Kafka be more focused
> on
> > > > > hiding
> > > > > > system-level integration details and leave it open for any
> > additional
> > > > > > modules outside the Kafka core to enrich the functionality that
> are
> > > > > > user-facing?
> > > > > >
> > > > > > Just a few quick cents. Thanks a lot!
> > > > > >
> > > > > > -Yi
> > > > > >
> > > > > > On Fri, Jul 24, 2015 at 12:12 AM, Neha Narkhede <
> n...@confluent.io
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Ewen:
> > > > > > >
> > > > > > > * I think trivial filtering and aggregation on a single stream
> > > > usually
> > > > > > work
> > > > > > > > fine with this model.
> > > > > > >
> > > > > > >
> > > > > > > The way I see this, the process() API is an abstraction for
> > > > > > > message-at-a-time computations. In the future, you could
> imagine
> > > > > > providing
> > > > > > > a simple DSL layer on top of the process() API that provides a
> > set
> > > of
> > > > > > APIs
> > > > > > > for stream processing operations on sets of messages like
> joins,
> > > > > windows
> > > > > > > and various aggregations.
> > > > > > >
> > > > > > > * Spark (and presumably
> > > > > > > > spark streaming) is supposed to get a big win by handling
> > > shuffles
> > > > > such
> > > > > > > > that the data just stays in cache and never actually hits
> disk,
> > > or
> > > > at
> > > > > > > least
> > > > > > > > hits disk in the background. Will we take a hit because we
> > always
> > > > > write
> > > > > > > to
> > > > > > > > Kafka?
> > > > > > >
> > > > > > >
> > > > > > > The goal isn't so much about forcing materialization of
> > > intermediate
> > > > > > > results into Kafka but designing the API to integrate with
> Kafka
> > to
> > > > > allow
> > > > > > > such materialization, wherever that might be required. The
> > downside
> > > > > with
> > > > > > > other stream processing frameworks is that they have weak
> > > integration
> > > > > > with
> > > > > > > Kafka where interaction with Kafka is only at the endpoints of
> > > > > processing
> > > > > > > (first input, final output). Any intermediate operations that
> > might
> > > > > > benefit
> > > > > > > from persisting intermediate results into Kafka are forced to
> be
> > > > broken
> > > > > > up
> > > > > > > into 2 separate topologies/plans/stages of processing that lead
> > to
> > > > more
> > > > > > > jobs. The implication is that now the set of stream processing
> > > > > operations
> > > > > > > that should really have lived in one job per application is now
> > > split
> > > > > up
> > > > > > > across several piecemeal jobs that need to be monitored,
> managed
> > > and
> > > > > > > operated separately. The APIs should still allows in-memory
> > storage
> > > > of
> > > > > > > intermediate results where they make sense.
> > > > > > >
> > > > > > > Jiangjie,
> > > > > > >
> > > > > > > I just took a quick look at the KIP, is it very similar to
> mirror
> > > > maker
> > > > > > > > with message handler?
> > > > > > >
> > > > > > >
> > > > > > > Not really. I wouldn't say it is similar, but mirror maker is a
> > > > special
> > > > > > > instance of using copycat with Kafka source, sink + optionally
> > the
> > > > > > > process() API. I can imagine replacing the MirrorMaker, in the
> > due
> > > > > course
> > > > > > > of time, with copycat + process().
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Neha
> > > > > > >
> > > > > > > On Thu, Jul 23, 2015 at 11:32 PM, Jiangjie Qin
> > > > > <j...@linkedin.com.invalid
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hey Guozhang,
> > > > > > > >
> > > > > > > > I just took a quick look at the KIP, is it very similar to
> > mirror
> > > > > maker
> > > > > > > > with message handler?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jiangjie (Becket) Qin
> > > > > > > >
> > > > > > > > On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava <
> > > > > > > e...@confluent.io
> > > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Just some notes on the KIP doc itself:
> > > > > > > > >
> > > > > > > > > * It'd be useful to clarify at what point the plain
> consumer
> > +
> > > > > custom
> > > > > > > > code
> > > > > > > > > + producer breaks down. I think trivial filtering and
> > > aggregation
> > > > > on
> > > > > > a
> > > > > > > > > single stream usually work fine with this model. Anything
> > where
> > > > you
> > > > > > > need
> > > > > > > > > more complex joins, windowing, etc. are where it breaks
> > down. I
> > > > > think
> > > > > > > > most
> > > > > > > > > interesting applications require that functionality, but
> it's
> > > > > helpful
> > > > > > > to
> > > > > > > > > make this really clear in the motivation -- right now,
> Kafka
> > > only
> > > > > > > > provides
> > > > > > > > > the lowest level plumbing for stream processing
> applications,
> > > so
> > > > > most
> > > > > > > > > interesting apps require very heavyweight frameworks.
> > > > > > > > > * I think the feature comparison of plain
> producer/consumer,
> > > > stream
> > > > > > > > > processing frameworks, and this new library is a good
> start,
> > > but
> > > > we
> > > > > > > might
> > > > > > > > > want something more thorough and structured, like a feature
> > > > matrix.
> > > > > > > Right
> > > > > > > > > now it's hard to figure out exactly how they relate to each
> > > > other.
> > > > > > > > > * I'd personally push the library vs. framework story very
> > > > strongly
> > > > > > --
> > > > > > > > the
> > > > > > > > > total buy-in and weak integration story of stream
> processing
> > > > > > frameworks
> > > > > > > > is
> > > > > > > > > a big downside and makes a library a really compelling (and
> > > > > currently
> > > > > > > > > unavailable, as far as I am aware) alternative.
> > > > > > > > > * Comment about in-memory storage of other frameworks is
> > > > > interesting
> > > > > > --
> > > > > > > > it
> > > > > > > > > is specific to the framework, but is supposed to also give
> > > > > > performance
> > > > > > > > > benefits. The high-level functional processing interface
> > would
> > > > > allow
> > > > > > > for
> > > > > > > > > combining multiple operations when there's no shuffle, but
> > when
> > > > > there
> > > > > > > is
> > > > > > > > a
> > > > > > > > > shuffle, we'll always be writing to Kafka, right? Spark
> (and
> > > > > > presumably
> > > > > > > > > spark streaming) is supposed to get a big win by handling
> > > > shuffles
> > > > > > such
> > > > > > > > > that the data just stays in cache and never actually hits
> > disk,
> > > > or
> > > > > at
> > > > > > > > least
> > > > > > > > > hits disk in the background. Will we take a hit because we
> > > always
> > > > > > write
> > > > > > > > to
> > > > > > > > > Kafka?
> > > > > > > > > * I really struggled with the structure of the KIP template
> > > with
> > > > > > > Copycat
> > > > > > > > > because the flow doesn't work well for proposals like this.
> > > They
> > > > > > aren't
> > > > > > > > as
> > > > > > > > > concrete changes as the KIP template was designed for. I'd
> > > > > completely
> > > > > > > > > ignore that template in favor of optimizing for clarity if
> I
> > > were
> > > > > > you.
> > > > > > > > >
> > > > > > > > > -Ewen
> > > > > > > > >
> > > > > > > > > On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang <
> > > > wangg...@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi all,
> > > > > > > > > >
> > > > > > > > > > I just posted KIP-28: Add a transform client for data
> > > > processing
> > > > > > > > > > <
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing
> > > > > > > > > > >
> > > > > > > > > > .
> > > > > > > > > >
> > > > > > > > > > The wiki page does not yet have the full design /
> > > > implementation
> > > > > > > > details,
> > > > > > > > > > and this email is to kick-off the conversation on whether
> > we
> > > > > should
> > > > > > > add
> > > > > > > > > > this new client with the described motivations, and if
> yes
> > > what
> > > > > > > > features
> > > > > > > > > /
> > > > > > > > > > functionalities should be included.
> > > > > > > > > >
> > > > > > > > > > Looking forward to your feedback!
> > > > > > > > > >
> > > > > > > > > > -- Guozhang
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > Thanks,
> > > > > > > > > Ewen
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Thanks,
> > > > > > > Neha
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to