Just a quick ping, that regardless of the name of the thing, I'm still interested in answers to my questions :)
On Tue, Jul 28, 2015 at 3:07 PM, Gwen Shapira <gshap...@cloudera.com> wrote: > Thanks Guazhang! Much clearer now, at least for me. > > Few comments / questions: > > 1. Perhaps punctuate(int numRecords) will be a nice API addition, some > use-cases have record-count based windows, rather than time-based.. > 2. The diagram for "Flexible partition distribution" shows two joins. > Is the idea to implement two Processors and string them together? > 3. Is the local state persistent? Can you talk a bit about how local > state works with high availability? > > Gwen > > On Tue, Jul 28, 2015 at 12:57 AM, Guozhang Wang <wangg...@gmail.com> wrote: >> I have updated the wiki page incorporating people's comments, please feel >> free to take another look before today's meeting. >> >> On Mon, Jul 27, 2015 at 11:19 PM, Yi Pan <nickpa...@gmail.com> wrote: >> >>> Hi, Jay, >>> >>> {quote} >>> 1. Yeah we are going to try to generalize the partition management stuff. >>> We'll get a wiki/JIRA up for that. I think that gives what you want in >>> terms of moving partitioning to the client side. >>> {quote} >>> Great! I am looking forward to that. >>> >>> {quote} >>> I think the key observation is that the whole reason >>> LinkedIn split data over clusters to begin with was because of the lack of >>> quotas, which are in any case getting implemented. >>> {quote} >>> I am not sure that I followed this point. Is your point that with quota, it >>> is possible to host all data in a single cluster? >>> >>> -Yi >>> >>> On Mon, Jul 27, 2015 at 8:53 AM, Jay Kreps <j...@confluent.io> wrote: >>> >>> > Hey Yi, >>> > >>> > Great points. I think for some of this the most useful thing would be to >>> > get a wip prototype out that we could discuss concretely. I think >>> Yasuhiro >>> > and Guozhang took that prototype I had done, and had some improvements. >>> > Give us a bit to get that into understandable shape so we can discuss. >>> > >>> > To address a few of your other points: >>> > 1. Yeah we are going to try to generalize the partition management stuff. >>> > We'll get a wiki/JIRA up for that. I think that gives what you want in >>> > terms of moving partitioning to the client side. >>> > 2. I think consuming from a different cluster you produce to will be >>> easy. >>> > More than that is more complex, though I agree the pluggable partitioning >>> > makes it theoretically possible. Let's try to get something that works >>> for >>> > the first case, it sounds like that solves the use case you describe of >>> > wanting to directly transform from a given cluster but produce back to a >>> > different cluster. I think the key observation is that the whole reason >>> > LinkedIn split data over clusters to begin with was because of the lack >>> of >>> > quotas, which are in any case getting implemented. >>> > >>> > -Jay >>> > >>> > On Sun, Jul 26, 2015 at 11:31 PM, Yi Pan <nickpa...@gmail.com> wrote: >>> > >>> > > Hi, Jay and all, >>> > > >>> > > Thanks for all your quick responses. I tried to summarize my thoughts >>> > here: >>> > > >>> > > - ConsumerRecord as stream processor API: >>> > > >>> > > * This KafkaProcessor API is targeted to receive the message from >>> > Kafka. >>> > > So, to Yasuhiro's join/transformation example, any join/transformation >>> > > results that are materialized in Kafka should have ConsumerRecord >>> format >>> > > (i.e. w/ topic and offsets). Any non-materialized join/transformation >>> > > results should not be processed by this KafkaProcessor API. One example >>> > is >>> > > the in-memory operators API in Samza, which is designed to handle the >>> > > non-materialzied join/transformation results. And yes, in this case, a >>> > more >>> > > abstract data model is needed. >>> > > >>> > > * Just to support Jay's point of a general >>> > > ConsumerRecord/ProducerRecord, a general stream processing on more than >>> > one >>> > > data sources would need at least the following info: data source >>> > > description (i.e. which topic/table), and actual data (i.e. key-value >>> > > pairs). It would make sense to have the data source name as part of the >>> > > general metadata in stream processing (think about it as the table name >>> > for >>> > > records in standard SQL). >>> > > >>> > > - SQL/DSL >>> > > >>> > > * I think that this topic itself is worthy of another KIP >>> discussion. >>> > I >>> > > would prefer to leave it out of scope in KIP-28. >>> > > >>> > > - Client-side pluggable partition manager >>> > > >>> > > * Given the use cases we have seen with large-scale deployment of >>> > > Samza/Kafka in LinkedIn, I would argue that we should make it as the >>> > > first-class citizen in this KIP. The use cases include: >>> > > >>> > > * multi-cluster Kafka >>> > > >>> > > * host-affinity (i.e. local-state associated w/ certain >>> partitions >>> > on >>> > > client) >>> > > >>> > > - Multi-cluster scenario >>> > > >>> > > * Although I originally just brought it up as a use case that >>> requires >>> > > client-side partition manager, reading Jay’s comments, I realized that >>> I >>> > > have one fundamental issue w/ the current copycat + transformation >>> model. >>> > > If I interpret Jay’s comment correctly, the proposed >>> > copycat+transformation >>> > > plays out in the following way: i) copycat takes all data from sources >>> > (no >>> > > matter it is Kafka or non-Kafka) into *one single Kafka cluster*; ii) >>> > > transformation is only restricted to take data sources in *this single >>> > > Kafka cluster* to perform aggregate/join etc. This is different from my >>> > > original understanding of the copycat. The main issue I have with this >>> > > model is: huge data-copy between Kafka clusters. In LinkedIn, we used >>> to >>> > > follow this model that uses MirrorMaker to map topics from tracking >>> > > clusters to Samza-specific Kafka cluster and only do stream processing >>> in >>> > > the Samza-specific Kafka cluster. We moved away from this model and >>> > started >>> > > allowing users to directly consume from tracking Kafka clusters due to >>> > the >>> > > overhead of copying huge amount of traffic between Kafka clusters. I >>> > agree >>> > > that the initial design of KIP-28 would probably need a smaller scope >>> of >>> > > problem to solve, hence, limiting to solving partition management in a >>> > > single cluster. However, I would really hope the design won’t prevent >>> the >>> > > use case of processing data directly from multiple clusters. In my >>> > opinion, >>> > > making the partition manager as a client-side pluggable logic would >>> allow >>> > > us to achieve these goals. >>> > > >>> > > Thanks a lot in advance! >>> > > >>> > > -Yi >>> > > >>> > > On Fri, Jul 24, 2015 at 11:13 AM, Jay Kreps <j...@confluent.io> wrote: >>> > > >>> > > > Hey Yi, >>> > > > >>> > > > For your other two points: >>> > > > >>> > > > - This definitely doesn't cover any kind of SQL or anything like >>> this. >>> > > > >>> > > > - The prototype we started with just had process() as a method but >>> > > Yasuhiro >>> > > > had some ideas of adding additional filter/aggregate convenience >>> > methods. >>> > > > We should discuss how this would fit with the operator work you were >>> > > doing >>> > > > in Samza. Probably the best way is just get the code out there in >>> > current >>> > > > state and start talking about it? >>> > > > >>> > > > - Your point about multiple clusters. We actually have a proposed >>> > > extension >>> > > > for the Kafka group management protocol that would allow it to cover >>> > > > multiple clusters but actually I think that use case is not the >>> focus. >>> > I >>> > > > think in scope would be consuming from one cluster and producing to >>> > > > another. >>> > > > >>> > > > One of the assumptions we are making is that we will split into two >>> > > > categories: >>> > > > a. Ingress/egress which is handled by copycat >>> > > > b. Transformation which would be handled by this api >>> > > > >>> > > > I think there are a number of motivations for this >>> > > > - It is really hard to provide hard guarantees if you allow >>> non-trivial >>> > > > aggregation coupled with the ingress/egress. So if you want to be >>> able >>> > to >>> > > > do something that provides a kind of end-to-end "exactly once" >>> > guarantee >>> > > > (that's not really the right term but what people use) I think it >>> will >>> > be >>> > > > really hard to do this across multiple systems (hello two-phase >>> commit) >>> > > > - The APIs for ingest/egress end up needing to be really different >>> for >>> > a >>> > > > first-class ingestion framework >>> > > > >>> > > > So the case where you have data coming from many systems including >>> many >>> > > > Kafka clusters is just about how easy/hard it is to use copycat with >>> > the >>> > > > transformer api in the same program. I think this is something we >>> > should >>> > > > work out as part of the prototyping. >>> > > > >>> > > > -Jay >>> > > > >>> > > > On Fri, Jul 24, 2015 at 12:57 AM, Yi Pan <nickpa...@gmail.com> >>> wrote: >>> > > > >>> > > > > Hi, Guozhang, >>> > > > > >>> > > > > Thanks for starting this. I took a quick look and had the following >>> > > > > thoughts to share: >>> > > > > >>> > > > > - In the proposed KafkaProcessor API, there is no interface like >>> > > > Collector >>> > > > > that allows users to send messages to. Why is that? Is the idea to >>> > > > > initialize the producer once and re-use it in the processor? And if >>> > > there >>> > > > > are many KStreamThreads in the process, are there going to be many >>> > > > > instances of KafkaProducer although all outputs are sending to the >>> > same >>> > > > > Kafka cluster? >>> > > > > >>> > > > > - Won’t it be simpler if the process() API just takes in the >>> > > > ConsumerRecord >>> > > > > as the input instead of a tuple of (topic, key, value)? >>> > > > > >>> > > > > - Also, the input only indicates the topic of a message. What if >>> the >>> > > > stream >>> > > > > task needs to consume and produce messages from/to multiple Kafka >>> > > > clusters? >>> > > > > To support that case, there should be a system/cluster name in both >>> > > input >>> > > > > and output as well. >>> > > > > >>> > > > > - How are the output messages handled? There does not seem to have >>> an >>> > > > > interface that allows user to send an output messages to multiple >>> > > output >>> > > > > Kafka clusters. >>> > > > > >>> > > > > - It seems the proposed model also assumes one thread per >>> processor. >>> > > What >>> > > > > becomes thread-local and what are shared among processors? Is the >>> > > > proposed >>> > > > > model targeting to have the consumers/producers become thread-local >>> > > > > instances within each KafkaProcessor? What’s the cost associated >>> with >>> > > > this >>> > > > > model? >>> > > > > >>> > > > > - One more important issue: how do we plug-in client-side partition >>> > > > > management logic? Considering about the use case where the stream >>> > task >>> > > > > needs to consume from multiple Kafka clusters, I am not even sure >>> > that >>> > > we >>> > > > > can rely on Kafka broker to maintain the consumer group membership? >>> > > Maybe >>> > > > > we still can get the per cluster consumer group membership and >>> > > > partitions. >>> > > > > However, in this case, we truly need a client-side plugin partition >>> > > > > management logic to determine how to assign partitions in different >>> > > Kafka >>> > > > > clusters to consumers (i.e. consumers for cluster1.topic1.p1 and >>> > > > > cluster2.topic2.p1 has to be assigned together to one >>> KafkaProcessor >>> > > for >>> > > > > processing). Based on the full information about (group members, >>> all >>> > > > topic >>> > > > > partitions) in all Kafka clusters with input topics, there should >>> be >>> > > two >>> > > > > levels of partition management policies: a) how to group all topic >>> > > > > partitions in all Kafka clusters to processor groups (i.e. the same >>> > > > concept >>> > > > > as Task group in Samza); b) how to assign the processor groups to >>> > group >>> > > > > members. Note if a processor group includes topic partitions from >>> > more >>> > > > than >>> > > > > one Kafka clusters, it has to be assigned to the common group >>> members >>> > > in >>> > > > > all relevant Kafka clusters. This can not be done just by the >>> brokers >>> > > in >>> > > > a >>> > > > > single Kafka cluster. >>> > > > > >>> > > > > - It seems that the intention of this KIP is also trying to put >>> > SQL/DSL >>> > > > > libraries into Kafka. Why is it? Shouldn't Kafka be more focused on >>> > > > hiding >>> > > > > system-level integration details and leave it open for any >>> additional >>> > > > > modules outside the Kafka core to enrich the functionality that are >>> > > > > user-facing? >>> > > > > >>> > > > > Just a few quick cents. Thanks a lot! >>> > > > > >>> > > > > -Yi >>> > > > > >>> > > > > On Fri, Jul 24, 2015 at 12:12 AM, Neha Narkhede <n...@confluent.io >>> > >>> > > > wrote: >>> > > > > >>> > > > > > Ewen: >>> > > > > > >>> > > > > > * I think trivial filtering and aggregation on a single stream >>> > > usually >>> > > > > work >>> > > > > > > fine with this model. >>> > > > > > >>> > > > > > >>> > > > > > The way I see this, the process() API is an abstraction for >>> > > > > > message-at-a-time computations. In the future, you could imagine >>> > > > > providing >>> > > > > > a simple DSL layer on top of the process() API that provides a >>> set >>> > of >>> > > > > APIs >>> > > > > > for stream processing operations on sets of messages like joins, >>> > > > windows >>> > > > > > and various aggregations. >>> > > > > > >>> > > > > > * Spark (and presumably >>> > > > > > > spark streaming) is supposed to get a big win by handling >>> > shuffles >>> > > > such >>> > > > > > > that the data just stays in cache and never actually hits disk, >>> > or >>> > > at >>> > > > > > least >>> > > > > > > hits disk in the background. Will we take a hit because we >>> always >>> > > > write >>> > > > > > to >>> > > > > > > Kafka? >>> > > > > > >>> > > > > > >>> > > > > > The goal isn't so much about forcing materialization of >>> > intermediate >>> > > > > > results into Kafka but designing the API to integrate with Kafka >>> to >>> > > > allow >>> > > > > > such materialization, wherever that might be required. The >>> downside >>> > > > with >>> > > > > > other stream processing frameworks is that they have weak >>> > integration >>> > > > > with >>> > > > > > Kafka where interaction with Kafka is only at the endpoints of >>> > > > processing >>> > > > > > (first input, final output). Any intermediate operations that >>> might >>> > > > > benefit >>> > > > > > from persisting intermediate results into Kafka are forced to be >>> > > broken >>> > > > > up >>> > > > > > into 2 separate topologies/plans/stages of processing that lead >>> to >>> > > more >>> > > > > > jobs. The implication is that now the set of stream processing >>> > > > operations >>> > > > > > that should really have lived in one job per application is now >>> > split >>> > > > up >>> > > > > > across several piecemeal jobs that need to be monitored, managed >>> > and >>> > > > > > operated separately. The APIs should still allows in-memory >>> storage >>> > > of >>> > > > > > intermediate results where they make sense. >>> > > > > > >>> > > > > > Jiangjie, >>> > > > > > >>> > > > > > I just took a quick look at the KIP, is it very similar to mirror >>> > > maker >>> > > > > > > with message handler? >>> > > > > > >>> > > > > > >>> > > > > > Not really. I wouldn't say it is similar, but mirror maker is a >>> > > special >>> > > > > > instance of using copycat with Kafka source, sink + optionally >>> the >>> > > > > > process() API. I can imagine replacing the MirrorMaker, in the >>> due >>> > > > course >>> > > > > > of time, with copycat + process(). >>> > > > > > >>> > > > > > Thanks, >>> > > > > > Neha >>> > > > > > >>> > > > > > On Thu, Jul 23, 2015 at 11:32 PM, Jiangjie Qin >>> > > > <j...@linkedin.com.invalid >>> > > > > > >>> > > > > > wrote: >>> > > > > > >>> > > > > > > Hey Guozhang, >>> > > > > > > >>> > > > > > > I just took a quick look at the KIP, is it very similar to >>> mirror >>> > > > maker >>> > > > > > > with message handler? >>> > > > > > > >>> > > > > > > Thanks, >>> > > > > > > >>> > > > > > > Jiangjie (Becket) Qin >>> > > > > > > >>> > > > > > > On Thu, Jul 23, 2015 at 10:25 PM, Ewen Cheslack-Postava < >>> > > > > > e...@confluent.io >>> > > > > > > > >>> > > > > > > wrote: >>> > > > > > > >>> > > > > > > > Just some notes on the KIP doc itself: >>> > > > > > > > >>> > > > > > > > * It'd be useful to clarify at what point the plain consumer >>> + >>> > > > custom >>> > > > > > > code >>> > > > > > > > + producer breaks down. I think trivial filtering and >>> > aggregation >>> > > > on >>> > > > > a >>> > > > > > > > single stream usually work fine with this model. Anything >>> where >>> > > you >>> > > > > > need >>> > > > > > > > more complex joins, windowing, etc. are where it breaks >>> down. I >>> > > > think >>> > > > > > > most >>> > > > > > > > interesting applications require that functionality, but it's >>> > > > helpful >>> > > > > > to >>> > > > > > > > make this really clear in the motivation -- right now, Kafka >>> > only >>> > > > > > > provides >>> > > > > > > > the lowest level plumbing for stream processing applications, >>> > so >>> > > > most >>> > > > > > > > interesting apps require very heavyweight frameworks. >>> > > > > > > > * I think the feature comparison of plain producer/consumer, >>> > > stream >>> > > > > > > > processing frameworks, and this new library is a good start, >>> > but >>> > > we >>> > > > > > might >>> > > > > > > > want something more thorough and structured, like a feature >>> > > matrix. >>> > > > > > Right >>> > > > > > > > now it's hard to figure out exactly how they relate to each >>> > > other. >>> > > > > > > > * I'd personally push the library vs. framework story very >>> > > strongly >>> > > > > -- >>> > > > > > > the >>> > > > > > > > total buy-in and weak integration story of stream processing >>> > > > > frameworks >>> > > > > > > is >>> > > > > > > > a big downside and makes a library a really compelling (and >>> > > > currently >>> > > > > > > > unavailable, as far as I am aware) alternative. >>> > > > > > > > * Comment about in-memory storage of other frameworks is >>> > > > interesting >>> > > > > -- >>> > > > > > > it >>> > > > > > > > is specific to the framework, but is supposed to also give >>> > > > > performance >>> > > > > > > > benefits. The high-level functional processing interface >>> would >>> > > > allow >>> > > > > > for >>> > > > > > > > combining multiple operations when there's no shuffle, but >>> when >>> > > > there >>> > > > > > is >>> > > > > > > a >>> > > > > > > > shuffle, we'll always be writing to Kafka, right? Spark (and >>> > > > > presumably >>> > > > > > > > spark streaming) is supposed to get a big win by handling >>> > > shuffles >>> > > > > such >>> > > > > > > > that the data just stays in cache and never actually hits >>> disk, >>> > > or >>> > > > at >>> > > > > > > least >>> > > > > > > > hits disk in the background. Will we take a hit because we >>> > always >>> > > > > write >>> > > > > > > to >>> > > > > > > > Kafka? >>> > > > > > > > * I really struggled with the structure of the KIP template >>> > with >>> > > > > > Copycat >>> > > > > > > > because the flow doesn't work well for proposals like this. >>> > They >>> > > > > aren't >>> > > > > > > as >>> > > > > > > > concrete changes as the KIP template was designed for. I'd >>> > > > completely >>> > > > > > > > ignore that template in favor of optimizing for clarity if I >>> > were >>> > > > > you. >>> > > > > > > > >>> > > > > > > > -Ewen >>> > > > > > > > >>> > > > > > > > On Thu, Jul 23, 2015 at 5:59 PM, Guozhang Wang < >>> > > wangg...@gmail.com >>> > > > > >>> > > > > > > wrote: >>> > > > > > > > >>> > > > > > > > > Hi all, >>> > > > > > > > > >>> > > > > > > > > I just posted KIP-28: Add a transform client for data >>> > > processing >>> > > > > > > > > < >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+transform+client+for+data+processing >>> > > > > > > > > > >>> > > > > > > > > . >>> > > > > > > > > >>> > > > > > > > > The wiki page does not yet have the full design / >>> > > implementation >>> > > > > > > details, >>> > > > > > > > > and this email is to kick-off the conversation on whether >>> we >>> > > > should >>> > > > > > add >>> > > > > > > > > this new client with the described motivations, and if yes >>> > what >>> > > > > > > features >>> > > > > > > > / >>> > > > > > > > > functionalities should be included. >>> > > > > > > > > >>> > > > > > > > > Looking forward to your feedback! >>> > > > > > > > > >>> > > > > > > > > -- Guozhang >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > -- >>> > > > > > > > Thanks, >>> > > > > > > > Ewen >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > -- >>> > > > > > Thanks, >>> > > > > > Neha >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >>> >> >> >> >> -- >> -- Guozhang