Here is the link to the original prototype we started with. I wouldn't focus to heavily on the details of this code or the api, but I think it gives the an idea of the lowest level api, amount of code, etc. It was basically a clone of Samza built on Kafka using the new consumer protocol just to explore the idea. https://github.com/jkreps/kafka/tree/streams/clients/src/main/java/org/apache/kafka/clients/streaming
I don't think we should talk about it too much because I think Guozhang and Yasu rewrote that, improved the apis, etc. But I think this does give a kind of proof that it is possible to make a relatively complete stream processing system that is only a few thousand lines of code that heavily embraces Kafka. -Jay On Tue, Jul 28, 2015 at 12:57 AM, Guozhang Wang <wangg...@gmail.com> wrote: > I have updated the wiki page incorporating people's comments, please feel > free to take another look before today's meeting. > > On Mon, Jul 27, 2015 at 11:19 PM, Yi Pan <nickpa...@gmail.com> wrote: > > > Hi, Jay, > > > > {quote} > > 1. Yeah we are going to try to generalize the partition management stuff. > > We'll get a wiki/JIRA up for that. I think that gives what you want in > > terms of moving partitioning to the client side. > > {quote} > > Great! I am looking forward to that. > > > > {quote} > > I think the key observation is that the whole reason > > LinkedIn split data over clusters to begin with was because of the lack > of > > quotas, which are in any case getting implemented. > > {quote} > > I am not sure that I followed this point. Is your point that with quota, > it > > is possible to host all data in a single cluster? > > > > -Yi > > > > On Mon, Jul 27, 2015 at 8:53 AM, Jay Kreps <j...@confluent.io> wrote: > > > > > Hey Yi, > > > > > > Great points. I think for some of this the most useful thing would be > to > > > get a wip prototype out that we could discuss concretely. I think > > Yasuhiro > > > and Guozhang took that prototype I had done, and had some improvements. > > > Give us a bit to get that into understandable shape so we can discuss. > > > > > > To address a few of your other points: > > > 1. Yeah we are going to try to generalize the partition management > stuff. > > > We'll get a wiki/JIRA up for that. I think that gives what you want in > > > terms of moving partitioning to the client side. > > > 2. I think consuming from a different cluster you produce to will be > > easy. > > > More than that is more complex, though I agree the pluggable > partitioning > > > makes it theoretically possible. Let's try to get something that works > > for > > > the first case, it sounds like that solves the use case you describe of > > > wanting to directly transform from a given cluster but produce back to > a > > > different cluster. I think the key observation is that the whole reason > > > LinkedIn split data over clusters to begin with was because of the lack > > of > > > quotas, which are in any case getting implemented. > > > > > > -Jay > > > > > > On Sun, Jul 26, 2015 at 11:31 PM, Yi Pan <nickpa...@gmail.com> wrote: > > > > > > > Hi, Jay and all, > > > > > > > > Thanks for all your quick responses. I tried to summarize my thoughts > > > here: > > > > > > > > - ConsumerRecord as stream processor API: > > > > > > > > * This KafkaProcessor API is targeted to receive the message from > > > Kafka. > > > > So, to Yasuhiro's join/transformation example, any > join/transformation > > > > results that are materialized in Kafka should have ConsumerRecord > > format > > > > (i.e. w/ topic and offsets). Any non-materialized join/transformation > > > > results should not be processed by this KafkaProcessor API. One > example > > > is > > > > the in-memory operators API in Samza, which is designed to handle the > > > > non-materialzied join/transformation results. And yes, in this case, > a > > > more > > > > abstract data model is needed. > > > > > > > > * Just to support Jay's point of a general > > > > ConsumerRecord/ProducerRecord, a general stream processing on more > than > > > one > > > > data sources would need at least the following info: data source > > > > description (i.e. which topic/table), and actual data (i.e. key-value > > > > pairs). It would make sense to have the data source name as part of > the > > > > general metadata in stream processing (think about it as the table > name > > > for > > > > records in standard SQL). > > > > > > > > - SQL/DSL > > > > > > > > * I think that this topic itself is worthy of another KIP > > discussion. > > > I > > > > would prefer to leave it out of scope in KIP-28. > > > > > > > > - Client-side pluggable partition manager > > > > > > > > * Given the use cases we have seen with large-scale deployment of > > > > Samza/Kafka in LinkedIn, I would argue that we should make it as the > > > > first-class citizen in this KIP. The use cases include: > > > > > > > > * multi-cluster Kafka > > > > > > > > * host-affinity (i.e. local-state associated w/ certain > > partitions > > > on > > > > client) > > > > > > > > - Multi-cluster scenario > > > > > > > > * Although I originally just brought it up as a use case that > > requires > > > > client-side partition manager, reading Jay’s comments, I realized > that > > I > > > > have one fundamental issue w/ the current copycat + transformation > > model. > > > > If I interpret Jay’s comment correctly, the proposed > > > copycat+transformation > > > > plays out in the following way: i) copycat takes all data from > sources > > > (no > > > > matter it is Kafka or non-Kafka) into *one single Kafka cluster*; ii) > > > > transformation is only restricted to take data sources in *this > single > > > > Kafka cluster* to perform aggregate/join etc. This is different from > my > > > > original understanding of the copycat. The main issue I have with > this > > > > model is: huge data-copy between Kafka clusters. In LinkedIn, we used > > to > > > > follow this model that uses MirrorMaker to map topics from tracking > > > > clusters to Samza-specific Kafka cluster and only do stream > processing > > in > > > > the Samza-specific Kafka cluster. We moved away from this model and > > > started > > > > allowing users to directly consume from tracking Kafka clusters due > to > > > the > > > > overhead of copying huge amount of traffic between Kafka clusters. I > > > agree > > > > that the initial design of KIP-28 would probably need a smaller scope > > of > > > > problem to solve, hence, limiting to solving partition management in > a > > > > single cluster. However, I would really hope the design won’t prevent > > the > > > > use case of processing data directly from multiple clusters. In my > > > opinion, > > > > making the partition manager as a client-side pluggable logic would > > allow > > > > us to achieve these goals. > > > > > > > > Thanks a lot in advance! > > > > > > > > -Yi > > > > > > > > On Fri, Jul 24, 2015 at 11:13 AM, Jay Kreps <j...@confluent.io> > wrote: > > > > > > > > > Hey Yi, > > > > > > > > > > For your other two points: > > > > > > > > > > - This definitely doesn't cover any kind of SQL or anything like > > this. > > > > > > > > > > - The prototype we started with just had process() as a method but > > > > Yasuhiro > > > > > had some ideas of adding additional filter/aggregate convenience > > > methods. > > > > > We should discuss how this would fit with the operator work you > were > > > > doing > > > > > in Samza. Probably the best way is just get the code out there in > > > current > > > > > state and start talking about it? > > > > > > > > > > - Your point about multiple clusters. We actually have a proposed > > > > extension > > > > > for the Kafka group management protocol that would allow it to > cover > > > > > multiple clusters but actually I think that use case is not the > > focus. > > > I > > > > > think in scope would be consuming from one cluster and producing to > > > > > another. > > > > > > > > > > One of the assumptions we are making is that we will split into two > > > > > categories: > > > > > a. Ingress/egress which is handled by copycat > > > > > b. Transformation which would be handled by this api > > > > > > > > > > I think there are a number of motivations for this > > > > > - It is really hard to provide hard guarantees if you allow > > non-trivial > > > > > aggregation coupled with the ingress/egress. So if you want to be > > able > > > to > > > > > do something that provides a kind of end-to-end "exactly once" > > > guarantee > > > > > (that's not really the right term but what people use) I think it > > will > > > be > > > > > really hard to do this across multiple systems (hello two-phase > > commit) > > > > > - The APIs for ingest/egress end up needing to be really different > > for > > > a > > > > > first-class ingestion framework > > > > > > > > > > So the case where you have data coming from many systems including > > many > > > > > Kafka clusters is just about how easy/hard it is to use copycat > with > > > the > > > > > transformer api in the same program. I think this is something we > > > should > > > > > work out as part of the prototyping. > > > > > > > > > > -Jay > > > > > > > > > > On Fri, Jul 24, 2015 at 12:57 AM, Yi Pan <nickpa...@gmail.com> > > wrote: > > > > > > > > > > > Hi, Guozhang, > > > > > > > > > > > > Thanks for starting this. I took a quick look and had the > following > > > > > > thoughts to share: > > > > > > > > > > > > - In the proposed KafkaProcessor API, there is no interface like > > > > > Collector > > > > > > that allows users to send messages to. Why is that? Is the idea > to > > > > > > initialize the producer once and re-use it in the processor? And > if > > > > there > > > > > > are many KStreamThreads in the process, are there going to be > many > > > > > > instances of KafkaProducer although all outputs are sending to > the > > > same > > > > > > Kafka cluster? > > > > > > > > > > > > - Won’t it be simpler if the process() API just takes in the > > > > > ConsumerRecord > > > > > > as the input instead of a tuple of (topic, key, value)? > > > > > > > > > > > > - Also, the input only indicates the topic of a message. What if > > the > > > > > stream > > > > > > task needs to consume and produce messages from/to multiple Kafka > > > > > clusters? > > > > > > To support that case, there should be a system/cluster name in > both > > > > input > > > > > > and output as well. > > > > > > > > > > > > - How are the output messages handled? There does not seem to > have > > an > > > > > > interface that allows user to send an output messages to multiple > > > > output > > > > > > Kafka clusters. > > > > > > > > > > > > - It seems the proposed model also assumes one thread per > > processor. > > > > What > > > > > > becomes thread-local and what are shared among processors? Is the > > > > > proposed > > > > > > model targeting to have the consumers/producers become > thread-local > > > > > > instances within each KafkaProcessor? What’s the cost associated > > with > > > > > this > > > > > > model? > > > > > > > > > > > > - One more important issue: how do we plug-in client-side > partition > > > > > > management logic? Considering about the use case where the stream > > > task > > > > > > needs to consume from multiple Kafka clusters, I am not even sure > > > that > > > > we > > > > > > can rely on Kafka broker to maintain the consumer group > membership? > > > > Maybe > > > > > > we still can get the per cluster consumer group membership and > > > > > partitions. > > > > > > However, in this case, we truly need a client-side plugin > partition > > > > > > management logic to determine how to assign partitions in > different > > > > Kafka > > > > > > clusters to consumers (i.e. consumers for cluster1.topic1.p1 and > > > > > > cluster2.topic2.p1 has to be assigned together to one > > KafkaProcessor > > > > for > > > > > > processing). Based on the full information about (group members, > > all > > > > > topic > > > > > > partitions) in all Kafka clusters with input topics, there should > > be > > > > two > > > > > > levels of partition management policies: a) how to group all > topic > > > > > > partitions in all Kafka clusters to processor groups (i.e. the > same > > > > > concept > > > > > > as Task group in Samza); b) how to assign the processor groups to > > > group > > > > > > members. Note if a processor group includes topic partitions from > > > more > > > > > than > > > > > > one Kafka clusters, it has to be assigned to the common group > > members > > > > in > > > > > > all relevant Kafka clusters. This can not be done just by the > > brokers > > > > in > > > > > a > > > > > > single Kafka cluster. > > > > > > > > > > > > - It seems that the intention of this KIP is also trying to put > > > SQL/DSL > > > > > > libraries into Kafka. Why is it? Shouldn't Kafka be more focused > on > > > > > hiding > > > > > > system-level integration details and leave it open for any > > additional > > > > > > modules outside the Kafka core to enrich the functionality that > are > > > > > > user-facing? > > > > > > > > > > > > Just a few quick cents. Thanks a lot! > > > > > > > > > > > > -Yi > > > > > > > > > > > > On Fri, Jul 24, 2015 at 12:12 AM, Neha Narkhede < > n...@confluent.io > > > > > > > > wrote: > > > > > > > > > > > > > Ewen: > > > > > > > > > > > > > > * I think trivial filtering and aggregation on a single stream > > > > usually > > > > > > work > > > > > > > > fine with this model. > > > > > > > > > > > > > > > > > > > > > The way I see this, the process() API is an abstraction for > > > > > > > message-at-a-time computations. In the future, you could > imagine > > > > > > providing > > > > > > > a simple DSL layer on top of the process() API that provides a > > set > > > of > > > > > > APIs > > > > > > > for stream processing operations on sets of messages like > joins, > > > > > windows > > > > > > > and various aggregations. > > > > > > > > > > > > > > * Spark (and presumably > > > > > > > > spark streaming) is supposed to get a big win by handling > > > shuffles > > > > > such > > > > > > > > that the data just stays in cache and never actually hits > disk, > > > or > > > > at > > > > > > > least > > > > > > > > hits disk in the background. Will we take a hit because we > > always > > > > > write > > > > > > > to > > > > > > > > Kafka? > > > > > > > > > > > > > > > > > > > > > The goal isn't so much about forcing materialization of > > > intermediate > > > > > > > results into Kafka but designing the API to integrate with > Kafka > > to > > > > > allow > > > > > > > such materialization, wherever that might be required. The > > downside > > > > > with > > > > > > > other stream processing frameworks is that they have weak > > > integration > > > > > > with > > > > > > > Kafka where interaction with Kafka is only at the endpoints of > > > > > processing > > > > > > > (first input, final output). Any intermediate operations that > > might > > > > > > benefit > > > > > > > from persisting intermediate results into Kafka are forced to > be > > > > broken > > > > > > up > > > > > > > into 2 separate topologies/plans/stages of processing that lead > > to > > > > more > > > > > > > jobs. The implication is that now the set of stream processing > > > > > operations > > > > > > > that should really have lived in one job per application is now > > > split > > > > > up > > > > > > > across several piecemeal jobs that need to be monitored, > managed > > > and > > > > > > > operated separately. The APIs should still allows in-memory > > storage > > > > of > > > > > > > intermediate results where they make sense. > > > > > > > > > > > > > > Jiangjie, > > > > > > > > > > > > > > I just took a quick look at the KIP, is it very similar to > mirror > > > > maker > > > > > > > > with message handler? > > > > > > > > > > > > > > > > > > > > > Not really. I wouldn't say it is similar, but mirror maker is a > > > > special > > > > > > > instance of using copycat with Kafka source, sink + optionally > > the > > > > > > > process() API. I can imagine replacing the MirrorMaker, in the > > due > > > > > course > > > > > > > of time, with copycat + process(). > > > > > > > > > > > > > > Thanks, > > > > > > > Neha > > > > > > > > > > > > > > On Thu, Jul 23, 2015 at 11:32 PM, Jiangjie Qin > > > > > <j...@linkedin.com.invalid > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hey Guozhang, > > > > > > > > > > > > > > > > I just took a quick look at the KIP, is it very similar to > > mirror > > > > > maker > > > > > > > > with message handler? > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava < > > > > > > > e...@confluent.io > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Just some notes on the KIP doc itself: > > > > > > > > > > > > > > > > > > * It'd be useful to clarify at what point the plain > consumer > > + > > > > > custom > > > > > > > > code > > > > > > > > > + producer breaks down. I think trivial filtering and > > > aggregation > > > > > on > > > > > > a > > > > > > > > > single stream usually work fine with this model. Anything > > where > > > > you > > > > > > > need > > > > > > > > > more complex joins, windowing, etc. are where it breaks > > down. I > > > > > think > > > > > > > > most > > > > > > > > > interesting applications require that functionality, but > it's > > > > > helpful > > > > > > > to > > > > > > > > > make this really clear in the motivation -- right now, > Kafka > > > only > > > > > > > > provides > > > > > > > > > the lowest level plumbing for stream processing > applications, > > > so > > > > > most > > > > > > > > > interesting apps require very heavyweight frameworks. > > > > > > > > > * I think the feature comparison of plain > producer/consumer, > > > > stream > > > > > > > > > processing frameworks, and this new library is a good > start, > > > but > > > > we > > > > > > > might > > > > > > > > > want something more thorough and structured, like a feature > > > > matrix. > > > > > > > Right > > > > > > > > > now it's hard to figure out exactly how they relate to each > > > > other. > > > > > > > > > * I'd personally push the library vs. framework story very > > > > strongly > > > > > > -- > > > > > > > > the > > > > > > > > > total buy-in and weak integration story of stream > processing > > > > > > frameworks > > > > > > > > is > > > > > > > > > a big downside and makes a library a really compelling (and > > > > > currently > > > > > > > > > unavailable, as far as I am aware) alternative. > > > > > > > > > * Comment about in-memory storage of other frameworks is > > > > > interesting > > > > > > -- > > > > > > > > it > > > > > > > > > is specific to the framework, but is supposed to also give > > > > > > performance > > > > > > > > > benefits. The high-level functional processing interface > > would > > > > > allow > > > > > > > for > > > > > > > > > combining multiple operations when there's no shuffle, but > > when > > > > > there > > > > > > > is > > > > > > > > a > > > > > > > > > shuffle, we'll always be writing to Kafka, right? Spark > (and > > > > > > presumably > > > > > > > > > spark streaming) is supposed to get a big win by handling > > > > shuffles > > > > > > such > > > > > > > > > that the data just stays in cache and never actually hits > > disk, > > > > or > > > > > at > > > > > > > > least > > > > > > > > > hits disk in the background. Will we take a hit because we > > > always > > > > > > write > > > > > > > > to > > > > > > > > > Kafka? > > > > > > > > > * I really struggled with the structure of the KIP template > > > with > > > > > > > Copycat > > > > > > > > > because the flow doesn't work well for proposals like this. > > > They > > > > > > aren't > > > > > > > > as > > > > > > > > > concrete changes as the KIP template was designed for. I'd > > > > > completely > > > > > > > > > ignore that template in favor of optimizing for clarity if > I > > > were > > > > > > you. > > > > > > > > > > > > > > > > > > -Ewen > > > > > > > > > > > > > > > > > > On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang < > > > > wangg...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > I just posted KIP-28: Add a transform client for data > > > > processing > > > > > > > > > > < > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing > > > > > > > > > > > > > > > > > > > > > . > > > > > > > > > > > > > > > > > > > > The wiki page does not yet have the full design / > > > > implementation > > > > > > > > details, > > > > > > > > > > and this email is to kick-off the conversation on whether > > we > > > > > should > > > > > > > add > > > > > > > > > > this new client with the described motivations, and if > yes > > > what > > > > > > > > features > > > > > > > > > / > > > > > > > > > > functionalities should be included. > > > > > > > > > > > > > > > > > > > > Looking forward to your feedback! > > > > > > > > > > > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > Thanks, > > > > > > > > > Ewen > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > Thanks, > > > > > > > Neha > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > -- Guozhang >