No problem, I just wanted to make sure people still had more to say. I will wait another week.
Thanks, Kyle On May 16, 2017 4:25 AM, "Eno Thereska" <eno.there...@gmail.com> wrote: > Hi Kyle, > > Sorry for the delay in reviews, tomorrow is feature freeze deadline ( > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0 < > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0>) > so people are busier than usual. Stay tuned. > > Eno > > On 15 May 2017, at 13:25, Kyle Winkelman <winkelman.k...@gmail.com> > wrote: > > > > Damian Guy, could you let me know if you plan to review this further? > There > > is no rush, but if you dont have any additional comments I could start > the > > voting and finish my WIP PR. > > > > Thanks, > > Kyle > > > > On May 9, 2017 11:07 AM, "Kyle Winkelman" <winkelman.k...@gmail.com> > wrote: > > > >> Eno, is there anyone else that is an expert in the kafka streams realm > >> that I should reach out to for input? > >> > >> I believe Damian Guy is still planning on reviewing this more in depth > so > >> I will wait for his inputs before continuing. > >> > >> On May 9, 2017 7:30 AM, "Eno Thereska" <eno.there...@gmail.com> wrote: > >> > >>> Thanks Kyle, good arguments. > >>> > >>> Eno > >>> > >>>> On May 7, 2017, at 5:06 PM, Kyle Winkelman <winkelman.k...@gmail.com> > >>> wrote: > >>>> > >>>> *- minor: could you add an exact example (similar to what Jay’s > example > >>> is, > >>>> or like your Spark/Pig pointers had) to make this super concrete?* > >>>> I have added a more concrete example to the KIP. > >>>> > >>>> *- my main concern is that we’re exposing this optimization to the > DSL. > >>> In > >>>> an ideal world, an optimizer would take the existing DSL and do the > >>> right > >>>> thing under the covers (create just one state store, arrange the nodes > >>>> etc). The original DSL had a bunch of small, composable pieces (group, > >>>> aggregate, join) that this proposal groups together. I’d like to hear > >>> your > >>>> thoughts on whether it’s possible to do this optimization with the > >>> current > >>>> DSL, at the topology builder level.* > >>>> You would have to make a lot of checks to understand if it is even > >>> possible > >>>> to make this optimization: > >>>> 1. Make sure they are all KTableKTableOuterJoins > >>>> 2. None of the intermediate KTables are used for anything else. > >>>> 3. None of the intermediate stores are used. (This may be impossible > >>>> especially if they use KafkaStreams#store after the topology has > already > >>>> been built.) > >>>> You would then need to make decisions during the optimization: > >>>> 1. Your new initializer would the composite of all the individual > >>>> initializers and the valueJoiners. > >>>> 2. I am having a hard time thinking about how you would turn the > >>>> aggregators and valueJoiners into an aggregator that would work on the > >>>> final object, but this may be possible. > >>>> 3. Which state store would you use? The ones declared would be for the > >>>> aggregate values. None of the declared ones would be guaranteed to > hold > >>> the > >>>> final object. This would mean you must created a new state store and > not > >>>> created any of the declared ones. > >>>> > >>>> The main argument I have against it is even if it could be done I > don't > >>>> know that we would want to have this be an optimization in the > >>> background > >>>> because the user would still be required to think about all of the > >>>> intermediate values that they shouldn't need to worry about if they > only > >>>> care about the final object. > >>>> > >>>> In my opinion cogroup is a common enough case that it should be part > of > >>> the > >>>> composable pieces (group, aggregate, join) because we want to allow > >>> people > >>>> to join more than 2 or more streams in an easy way. Right now I don't > >>> think > >>>> we give them ways of handling this use case easily. > >>>> > >>>> *-I think there will be scope for several such optimizations in the > >>> future > >>>> and perhaps at some point we need to think about decoupling the 1:1 > >>> mapping > >>>> from the DSL into the physical topology.* > >>>> I would argue that cogroup is not just an optimization it is a new way > >>> for > >>>> the users to look at accomplishing a problem that requires multiple > >>>> streams. I may sound like a broken record but I don't think users > should > >>>> have to build the N-1 intermediate tables and deal with their > >>> initializers, > >>>> serdes and stores if all they care about is the final object. > >>>> Now if for example someone uses cogroup but doesn't supply additional > >>>> streams and aggregators this case is equivalent to a single grouped > >>> stream > >>>> making an aggregate call. This case is what I view an optimization as, > >>> we > >>>> could remove the KStreamCogroup and act as if there was just a call to > >>>> KGroupedStream#aggregate instead of calling KGroupedStream#cogroup. (I > >>>> would prefer to just write a warning saying that this is not how > >>> cogroup is > >>>> to be used.) > >>>> > >>>> Thanks, > >>>> Kyle > >>>> > >>>> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <eno.there...@gmail.com> > >>> wrote: > >>>> > >>>>> Hi Kyle, > >>>>> > >>>>> Thanks for the KIP again. A couple of comments: > >>>>> > >>>>> - minor: could you add an exact example (similar to what Jay’s > example > >>> is, > >>>>> or like your Spark/Pig pointers had) to make this super concrete? > >>>>> > >>>>> - my main concern is that we’re exposing this optimization to the > DSL. > >>> In > >>>>> an ideal world, an optimizer would take the existing DSL and do the > >>> right > >>>>> thing under the covers (create just one state store, arrange the > nodes > >>>>> etc). The original DSL had a bunch of small, composable pieces > (group, > >>>>> aggregate, join) that this proposal groups together. I’d like to hear > >>> your > >>>>> thoughts on whether it’s possible to do this optimization with the > >>> current > >>>>> DSL, at the topology builder level. > >>>>> > >>>>> I think there will be scope for several such optimizations in the > >>> future > >>>>> and perhaps at some point we need to think about decoupling the 1:1 > >>> mapping > >>>>> from the DSL into the physical topology. > >>>>> > >>>>> Thanks > >>>>> Eno > >>>>> > >>>>>> On May 5, 2017, at 4:39 PM, Jay Kreps <j...@confluent.io> wrote: > >>>>>> > >>>>>> I haven't digested the proposal but the use case is pretty common. > An > >>>>>> example would be the "customer 360" or "unified customer profile" > use > >>>>> case > >>>>>> we often use. In that use case you have a dozen systems each of > which > >>> has > >>>>>> some information about your customer (account details, settings, > >>> billing > >>>>>> info, customer service contacts, purchase history, etc). Your goal > is > >>> to > >>>>>> join/munge these into a single profile record for each customer that > >>> has > >>>>>> all the relevant info in a usable form and is up-to-date with all > the > >>>>>> source systems. If you implement that with kstreams as a sequence of > >>>>> joins > >>>>>> i think today we'd fully materialize N-1 intermediate tables. But > >>> clearly > >>>>>> you only need a single stage to group all these things that are > >>> already > >>>>>> co-partitioned. A distributed database would do this under the > covers > >>>>> which > >>>>>> is arguably better (at least when it does the right thing) and > >>> perhaps we > >>>>>> could do the same thing but I'm not sure we know the partitioning so > >>> we > >>>>> may > >>>>>> need an explicit cogroup command that impllies they are already > >>>>>> co-partitioned. > >>>>>> > >>>>>> -Jay > >>>>>> > >>>>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman < > >>> winkelman.k...@gmail.com > >>>>>> > >>>>>> wrote: > >>>>>> > >>>>>>> Yea thats a good way to look at it. > >>>>>>> I have seen this type of functionality in a couple other platforms > >>> like > >>>>>>> spark and pig. > >>>>>>> https://spark.apache.org/docs/0.6.2/api/core/spark/ > >>>>> PairRDDFunctions.html > >>>>>>> https://www.tutorialspoint.com/apache_pig/apache_pig_ > >>>>> cogroup_operator.htm > >>>>>>> > >>>>>>> > >>>>>>> On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com> wrote: > >>>>>>> > >>>>>>>> Hi Kyle, > >>>>>>>> > >>>>>>>> If i'm reading this correctly it is like an N way outer join? So > an > >>>>> input > >>>>>>>> on any stream will always produce a new aggregated value - is that > >>>>>>> correct? > >>>>>>>> Effectively, each Aggregator just looks up the current value, > >>>>> aggregates > >>>>>>>> and forwards the result. > >>>>>>>> I need to look into it and think about it a bit more, but it seems > >>> like > >>>>>>> it > >>>>>>>> could be a useful optimization. > >>>>>>>> > >>>>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman < > >>> winkelman.k...@gmail.com> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> I sure can. I have added the following description to my KIP. If > >>> this > >>>>>>>>> doesn't help let me know and I will take some more time to build > a > >>>>>>>> diagram > >>>>>>>>> and make more of a step by step description: > >>>>>>>>> > >>>>>>>>> Example with Current API: > >>>>>>>>> > >>>>>>>>> KTable<K, V1> table1 = > >>>>>>>>> builder.stream("topic1").groupByKey().aggregate(initializer1, > >>>>>>>> aggregator1, > >>>>>>>>> aggValueSerde1, storeName1); > >>>>>>>>> KTable<K, V2> table2 = > >>>>>>>>> builder.stream("topic2").groupByKey().aggregate(initializer2, > >>>>>>>> aggregator2, > >>>>>>>>> aggValueSerde2, storeName2); > >>>>>>>>> KTable<K, V3> table3 = > >>>>>>>>> builder.stream("topic3").groupByKey().aggregate(initializer3, > >>>>>>>> aggregator3, > >>>>>>>>> aggValueSerde3, storeName3); > >>>>>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2, > >>>>>>>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree); > >>>>>>>>> > >>>>>>>>> As you can see this creates 3 StateStores, requires 3 > initializers, > >>>>>>> and 3 > >>>>>>>>> aggValueSerdes. This also adds the pressure to user to define > what > >>> the > >>>>>>>>> intermediate values are going to be (V1, V2, V3). They are left > >>> with a > >>>>>>>>> couple choices, first to make V1, V2, and V3 all the same as CG > and > >>>>> the > >>>>>>>> two > >>>>>>>>> joiners are more like mergers, or second make them intermediate > >>> states > >>>>>>>> such > >>>>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners use those > to > >>>>>>> build > >>>>>>>>> the final aggregate CG value. This is something the user could > >>> avoid > >>>>>>>>> thinking about with this KIP. > >>>>>>>>> > >>>>>>>>> When a new input arrives lets say at "topic1" it will first go > >>> through > >>>>>>> a > >>>>>>>>> KStreamAggregate grabbing the current aggregate from storeName1. > It > >>>>>>> will > >>>>>>>>> produce this in the form of the first intermediate value and get > >>> sent > >>>>>>>>> through a KTableKTableOuterJoin where it will look up the current > >>>>> value > >>>>>>>> of > >>>>>>>>> the key in storeName2. It will use the first joiner to calculate > >>> the > >>>>>>>> second > >>>>>>>>> intermediate value, which will go through an additional > >>>>>>>>> KTableKTableOuterJoin. Here it will look up the current value of > >>> the > >>>>>>> key > >>>>>>>> in > >>>>>>>>> storeName3 and use the second joiner to build the final aggregate > >>>>>>> value. > >>>>>>>>> > >>>>>>>>> If you think through all possibilities for incoming topics you > will > >>>>> see > >>>>>>>>> that no matter which topic it comes in through all three stores > are > >>>>>>>> queried > >>>>>>>>> and all of the joiners must get used. > >>>>>>>>> > >>>>>>>>> Topology wise for N incoming streams this creates N > >>>>>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1 > >>>>>>>>> KTableKTableJoinMergers. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Example with Proposed API: > >>>>>>>>> > >>>>>>>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1"). > >>>>>>> groupByKey(); > >>>>>>>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2"). > >>>>>>> groupByKey(); > >>>>>>>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3"). > >>>>>>> groupByKey(); > >>>>>>>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1, > >>> aggregator1, > >>>>>>>>> aggValueSerde1, storeName1) > >>>>>>>>> .cogroup(grouped2, aggregator2) > >>>>>>>>> .cogroup(grouped3, aggregator3) > >>>>>>>>> .aggregate(); > >>>>>>>>> > >>>>>>>>> As you can see this creates 1 StateStore, requires 1 initializer, > >>> and > >>>>> 1 > >>>>>>>>> aggValueSerde. The user no longer has to worry about the > >>> intermediate > >>>>>>>>> values and the joiners. All they have to think about is how each > >>>>> stream > >>>>>>>>> impacts the creation of the final CG object. > >>>>>>>>> > >>>>>>>>> When a new input arrives lets say at "topic1" it will first go > >>> through > >>>>>>> a > >>>>>>>>> KStreamAggreagte and grab the current aggregate from storeName1. > It > >>>>>>> will > >>>>>>>>> add its incoming object to the aggregate, update the store and > pass > >>>>> the > >>>>>>>> new > >>>>>>>>> aggregate on. This new aggregate goes through the KStreamCogroup > >>> which > >>>>>>> is > >>>>>>>>> pretty much just a pass through processor and you are done. > >>>>>>>>> > >>>>>>>>> Topology wise for N incoming streams the new api will only every > >>>>>>> create N > >>>>>>>>> KStreamAggregates and 1 KStreamCogroup. > >>>>>>>>> > >>>>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax < > >>>>> matth...@confluent.io > >>>>>>>> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Kyle, > >>>>>>>>>> > >>>>>>>>>> thanks a lot for the KIP. Maybe I am a little slow, but I could > >>> not > >>>>>>>>>> follow completely. Could you maybe add a more concrete example, > >>> like > >>>>>>> 3 > >>>>>>>>>> streams with 3 records each (plus expected result), and show the > >>>>>>>>>> difference between current way to to implement it and the > proposed > >>>>>>> API? > >>>>>>>>>> This could also cover the internal processing to see what store > >>> calls > >>>>>>>>>> would be required for both approaches etc. > >>>>>>>>>> > >>>>>>>>>> I think, it's pretty advanced stuff you propose, and it would > >>> help to > >>>>>>>>>> understand it better. > >>>>>>>>>> > >>>>>>>>>> Thanks a lot! > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -Matthias > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote: > >>>>>>>>>>> I have made a pull request. It can be found here. > >>>>>>>>>>> > >>>>>>>>>>> https://github.com/apache/kafka/pull/2975 > >>>>>>>>>>> > >>>>>>>>>>> I plan to write some more unit tests for my classes and get > >>> around > >>>>>>> to > >>>>>>>>>>> writing documentation for the public api additions. > >>>>>>>>>>> > >>>>>>>>>>> One thing I was curious about is during the > >>>>>>>>>> KCogroupedStreamImpl#aggregate > >>>>>>>>>>> method I pass null to the KGroupedStream#repartitionIfRequired > >>>>>>>> method. > >>>>>>>>> I > >>>>>>>>>>> can't supply the store name because if more than one grouped > >>> stream > >>>>>>>>>>> repartitions an error is thrown. Is there some name that > someone > >>>>>>> can > >>>>>>>>>>> recommend or should I leave the null and allow it to fall back > to > >>>>>>> the > >>>>>>>>>>> KGroupedStream.name? > >>>>>>>>>>> > >>>>>>>>>>> Should this be expanded to handle grouped tables? This would be > >>>>>>>> pretty > >>>>>>>>>> easy > >>>>>>>>>>> for a normal aggregate but one allowing session stores and > >>> windowed > >>>>>>>>>> stores > >>>>>>>>>>> would required KTableSessionWindowAggregate and > >>>>>>> KTableWindowAggregate > >>>>>>>>>>> implementations. > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> Kyle > >>>>>>>>>>> > >>>>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <eno.there...@gmail.com > > > >>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> I’ll look as well asap, sorry, been swamped. > >>>>>>>>>>>> > >>>>>>>>>>>> Eno > >>>>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <damian....@gmail.com > > > >>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi Kyle, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the KIP. I apologize that i haven't had the chance > >>> to > >>>>>>>> look > >>>>>>>>>> at > >>>>>>>>>>>>> the KIP yet, but will schedule some time to look into it > >>>>>>> tomorrow. > >>>>>>>>> For > >>>>>>>>>>>> the > >>>>>>>>>>>>> implementation, can you raise a PR against kafka trunk and > mark > >>>>>>> it > >>>>>>>> as > >>>>>>>>>>>> WIP? > >>>>>>>>>>>>> It will be easier to review what you have done. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>> Damian > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman < > >>>>>>>> winkelman.k...@gmail.com > >>>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> I am replying to this in hopes it will draw some attention > to > >>> my > >>>>>>>> KIP > >>>>>>>>>> as > >>>>>>>>>>>> I > >>>>>>>>>>>>>> haven't heard from anyone in a couple days. This is my first > >>> KIP > >>>>>>>> and > >>>>>>>>>> my > >>>>>>>>>>>>>> first large contribution to the project so I'm sure I did > >>>>>>>> something > >>>>>>>>>>>> wrong. > >>>>>>>>>>>>>> ;) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" < > >>>>>>>> winkelman.k...@gmail.com> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hello all, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I have created KIP-150 to facilitate discussion about > adding > >>>>>>>>> cogroup > >>>>>>>>>> to > >>>>>>>>>>>>>>> the streams DSL. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Please find the KIP here: > >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Please find my initial implementation here: > >>>>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>> Kyle Winkelman > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>>>> > >>> > >>> > >