Eno, is there anyone else that is an expert in the kafka streams realm that I should reach out to for input?
I believe Damian Guy is still planning on reviewing this more in depth so I will wait for his inputs before continuing. On May 9, 2017 7:30 AM, "Eno Thereska" <eno.there...@gmail.com> wrote: > Thanks Kyle, good arguments. > > Eno > > > On May 7, 2017, at 5:06 PM, Kyle Winkelman <winkelman.k...@gmail.com> > wrote: > > > > *- minor: could you add an exact example (similar to what Jay’s example > is, > > or like your Spark/Pig pointers had) to make this super concrete?* > > I have added a more concrete example to the KIP. > > > > *- my main concern is that we’re exposing this optimization to the DSL. > In > > an ideal world, an optimizer would take the existing DSL and do the right > > thing under the covers (create just one state store, arrange the nodes > > etc). The original DSL had a bunch of small, composable pieces (group, > > aggregate, join) that this proposal groups together. I’d like to hear > your > > thoughts on whether it’s possible to do this optimization with the > current > > DSL, at the topology builder level.* > > You would have to make a lot of checks to understand if it is even > possible > > to make this optimization: > > 1. Make sure they are all KTableKTableOuterJoins > > 2. None of the intermediate KTables are used for anything else. > > 3. None of the intermediate stores are used. (This may be impossible > > especially if they use KafkaStreams#store after the topology has already > > been built.) > > You would then need to make decisions during the optimization: > > 1. Your new initializer would the composite of all the individual > > initializers and the valueJoiners. > > 2. I am having a hard time thinking about how you would turn the > > aggregators and valueJoiners into an aggregator that would work on the > > final object, but this may be possible. > > 3. Which state store would you use? The ones declared would be for the > > aggregate values. None of the declared ones would be guaranteed to hold > the > > final object. This would mean you must created a new state store and not > > created any of the declared ones. > > > > The main argument I have against it is even if it could be done I don't > > know that we would want to have this be an optimization in the background > > because the user would still be required to think about all of the > > intermediate values that they shouldn't need to worry about if they only > > care about the final object. > > > > In my opinion cogroup is a common enough case that it should be part of > the > > composable pieces (group, aggregate, join) because we want to allow > people > > to join more than 2 or more streams in an easy way. Right now I don't > think > > we give them ways of handling this use case easily. > > > > *-I think there will be scope for several such optimizations in the > future > > and perhaps at some point we need to think about decoupling the 1:1 > mapping > > from the DSL into the physical topology.* > > I would argue that cogroup is not just an optimization it is a new way > for > > the users to look at accomplishing a problem that requires multiple > > streams. I may sound like a broken record but I don't think users should > > have to build the N-1 intermediate tables and deal with their > initializers, > > serdes and stores if all they care about is the final object. > > Now if for example someone uses cogroup but doesn't supply additional > > streams and aggregators this case is equivalent to a single grouped > stream > > making an aggregate call. This case is what I view an optimization as, we > > could remove the KStreamCogroup and act as if there was just a call to > > KGroupedStream#aggregate instead of calling KGroupedStream#cogroup. (I > > would prefer to just write a warning saying that this is not how cogroup > is > > to be used.) > > > > Thanks, > > Kyle > > > > On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <eno.there...@gmail.com> > wrote: > > > >> Hi Kyle, > >> > >> Thanks for the KIP again. A couple of comments: > >> > >> - minor: could you add an exact example (similar to what Jay’s example > is, > >> or like your Spark/Pig pointers had) to make this super concrete? > >> > >> - my main concern is that we’re exposing this optimization to the DSL. > In > >> an ideal world, an optimizer would take the existing DSL and do the > right > >> thing under the covers (create just one state store, arrange the nodes > >> etc). The original DSL had a bunch of small, composable pieces (group, > >> aggregate, join) that this proposal groups together. I’d like to hear > your > >> thoughts on whether it’s possible to do this optimization with the > current > >> DSL, at the topology builder level. > >> > >> I think there will be scope for several such optimizations in the future > >> and perhaps at some point we need to think about decoupling the 1:1 > mapping > >> from the DSL into the physical topology. > >> > >> Thanks > >> Eno > >> > >>> On May 5, 2017, at 4:39 PM, Jay Kreps <j...@confluent.io> wrote: > >>> > >>> I haven't digested the proposal but the use case is pretty common. An > >>> example would be the "customer 360" or "unified customer profile" use > >> case > >>> we often use. In that use case you have a dozen systems each of which > has > >>> some information about your customer (account details, settings, > billing > >>> info, customer service contacts, purchase history, etc). Your goal is > to > >>> join/munge these into a single profile record for each customer that > has > >>> all the relevant info in a usable form and is up-to-date with all the > >>> source systems. If you implement that with kstreams as a sequence of > >> joins > >>> i think today we'd fully materialize N-1 intermediate tables. But > clearly > >>> you only need a single stage to group all these things that are already > >>> co-partitioned. A distributed database would do this under the covers > >> which > >>> is arguably better (at least when it does the right thing) and perhaps > we > >>> could do the same thing but I'm not sure we know the partitioning so we > >> may > >>> need an explicit cogroup command that impllies they are already > >>> co-partitioned. > >>> > >>> -Jay > >>> > >>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman < > winkelman.k...@gmail.com > >>> > >>> wrote: > >>> > >>>> Yea thats a good way to look at it. > >>>> I have seen this type of functionality in a couple other platforms > like > >>>> spark and pig. > >>>> https://spark.apache.org/docs/0.6.2/api/core/spark/ > >> PairRDDFunctions.html > >>>> https://www.tutorialspoint.com/apache_pig/apache_pig_ > >> cogroup_operator.htm > >>>> > >>>> > >>>> On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com> wrote: > >>>> > >>>>> Hi Kyle, > >>>>> > >>>>> If i'm reading this correctly it is like an N way outer join? So an > >> input > >>>>> on any stream will always produce a new aggregated value - is that > >>>> correct? > >>>>> Effectively, each Aggregator just looks up the current value, > >> aggregates > >>>>> and forwards the result. > >>>>> I need to look into it and think about it a bit more, but it seems > like > >>>> it > >>>>> could be a useful optimization. > >>>>> > >>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <winkelman.k...@gmail.com > > > >>>>> wrote: > >>>>> > >>>>>> I sure can. I have added the following description to my KIP. If > this > >>>>>> doesn't help let me know and I will take some more time to build a > >>>>> diagram > >>>>>> and make more of a step by step description: > >>>>>> > >>>>>> Example with Current API: > >>>>>> > >>>>>> KTable<K, V1> table1 = > >>>>>> builder.stream("topic1").groupByKey().aggregate(initializer1, > >>>>> aggregator1, > >>>>>> aggValueSerde1, storeName1); > >>>>>> KTable<K, V2> table2 = > >>>>>> builder.stream("topic2").groupByKey().aggregate(initializer2, > >>>>> aggregator2, > >>>>>> aggValueSerde2, storeName2); > >>>>>> KTable<K, V3> table3 = > >>>>>> builder.stream("topic3").groupByKey().aggregate(initializer3, > >>>>> aggregator3, > >>>>>> aggValueSerde3, storeName3); > >>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2, > >>>>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree); > >>>>>> > >>>>>> As you can see this creates 3 StateStores, requires 3 initializers, > >>>> and 3 > >>>>>> aggValueSerdes. This also adds the pressure to user to define what > the > >>>>>> intermediate values are going to be (V1, V2, V3). They are left > with a > >>>>>> couple choices, first to make V1, V2, and V3 all the same as CG and > >> the > >>>>> two > >>>>>> joiners are more like mergers, or second make them intermediate > states > >>>>> such > >>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners use those to > >>>> build > >>>>>> the final aggregate CG value. This is something the user could avoid > >>>>>> thinking about with this KIP. > >>>>>> > >>>>>> When a new input arrives lets say at "topic1" it will first go > through > >>>> a > >>>>>> KStreamAggregate grabbing the current aggregate from storeName1. It > >>>> will > >>>>>> produce this in the form of the first intermediate value and get > sent > >>>>>> through a KTableKTableOuterJoin where it will look up the current > >> value > >>>>> of > >>>>>> the key in storeName2. It will use the first joiner to calculate the > >>>>> second > >>>>>> intermediate value, which will go through an additional > >>>>>> KTableKTableOuterJoin. Here it will look up the current value of the > >>>> key > >>>>> in > >>>>>> storeName3 and use the second joiner to build the final aggregate > >>>> value. > >>>>>> > >>>>>> If you think through all possibilities for incoming topics you will > >> see > >>>>>> that no matter which topic it comes in through all three stores are > >>>>> queried > >>>>>> and all of the joiners must get used. > >>>>>> > >>>>>> Topology wise for N incoming streams this creates N > >>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1 > >>>>>> KTableKTableJoinMergers. > >>>>>> > >>>>>> > >>>>>> > >>>>>> Example with Proposed API: > >>>>>> > >>>>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1"). > >>>> groupByKey(); > >>>>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2"). > >>>> groupByKey(); > >>>>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3"). > >>>> groupByKey(); > >>>>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1, > aggregator1, > >>>>>> aggValueSerde1, storeName1) > >>>>>> .cogroup(grouped2, aggregator2) > >>>>>> .cogroup(grouped3, aggregator3) > >>>>>> .aggregate(); > >>>>>> > >>>>>> As you can see this creates 1 StateStore, requires 1 initializer, > and > >> 1 > >>>>>> aggValueSerde. The user no longer has to worry about the > intermediate > >>>>>> values and the joiners. All they have to think about is how each > >> stream > >>>>>> impacts the creation of the final CG object. > >>>>>> > >>>>>> When a new input arrives lets say at "topic1" it will first go > through > >>>> a > >>>>>> KStreamAggreagte and grab the current aggregate from storeName1. It > >>>> will > >>>>>> add its incoming object to the aggregate, update the store and pass > >> the > >>>>> new > >>>>>> aggregate on. This new aggregate goes through the KStreamCogroup > which > >>>> is > >>>>>> pretty much just a pass through processor and you are done. > >>>>>> > >>>>>> Topology wise for N incoming streams the new api will only every > >>>> create N > >>>>>> KStreamAggregates and 1 KStreamCogroup. > >>>>>> > >>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax < > >> matth...@confluent.io > >>>>> > >>>>>> wrote: > >>>>>> > >>>>>>> Kyle, > >>>>>>> > >>>>>>> thanks a lot for the KIP. Maybe I am a little slow, but I could not > >>>>>>> follow completely. Could you maybe add a more concrete example, > like > >>>> 3 > >>>>>>> streams with 3 records each (plus expected result), and show the > >>>>>>> difference between current way to to implement it and the proposed > >>>> API? > >>>>>>> This could also cover the internal processing to see what store > calls > >>>>>>> would be required for both approaches etc. > >>>>>>> > >>>>>>> I think, it's pretty advanced stuff you propose, and it would help > to > >>>>>>> understand it better. > >>>>>>> > >>>>>>> Thanks a lot! > >>>>>>> > >>>>>>> > >>>>>>> -Matthias > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote: > >>>>>>>> I have made a pull request. It can be found here. > >>>>>>>> > >>>>>>>> https://github.com/apache/kafka/pull/2975 > >>>>>>>> > >>>>>>>> I plan to write some more unit tests for my classes and get around > >>>> to > >>>>>>>> writing documentation for the public api additions. > >>>>>>>> > >>>>>>>> One thing I was curious about is during the > >>>>>>> KCogroupedStreamImpl#aggregate > >>>>>>>> method I pass null to the KGroupedStream#repartitionIfRequired > >>>>> method. > >>>>>> I > >>>>>>>> can't supply the store name because if more than one grouped > stream > >>>>>>>> repartitions an error is thrown. Is there some name that someone > >>>> can > >>>>>>>> recommend or should I leave the null and allow it to fall back to > >>>> the > >>>>>>>> KGroupedStream.name? > >>>>>>>> > >>>>>>>> Should this be expanded to handle grouped tables? This would be > >>>>> pretty > >>>>>>> easy > >>>>>>>> for a normal aggregate but one allowing session stores and > windowed > >>>>>>> stores > >>>>>>>> would required KTableSessionWindowAggregate and > >>>> KTableWindowAggregate > >>>>>>>> implementations. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Kyle > >>>>>>>> > >>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <eno.there...@gmail.com> > >>>>> wrote: > >>>>>>>> > >>>>>>>>> I’ll look as well asap, sorry, been swamped. > >>>>>>>>> > >>>>>>>>> Eno > >>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <damian....@gmail.com> > >>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> Hi Kyle, > >>>>>>>>>> > >>>>>>>>>> Thanks for the KIP. I apologize that i haven't had the chance to > >>>>> look > >>>>>>> at > >>>>>>>>>> the KIP yet, but will schedule some time to look into it > >>>> tomorrow. > >>>>>> For > >>>>>>>>> the > >>>>>>>>>> implementation, can you raise a PR against kafka trunk and mark > >>>> it > >>>>> as > >>>>>>>>> WIP? > >>>>>>>>>> It will be easier to review what you have done. > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> Damian > >>>>>>>>>> > >>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman < > >>>>> winkelman.k...@gmail.com > >>>>>>> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> I am replying to this in hopes it will draw some attention to > my > >>>>> KIP > >>>>>>> as > >>>>>>>>> I > >>>>>>>>>>> haven't heard from anyone in a couple days. This is my first > KIP > >>>>> and > >>>>>>> my > >>>>>>>>>>> first large contribution to the project so I'm sure I did > >>>>> something > >>>>>>>>> wrong. > >>>>>>>>>>> ;) > >>>>>>>>>>> > >>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" < > >>>>> winkelman.k...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hello all, > >>>>>>>>>>>> > >>>>>>>>>>>> I have created KIP-150 to facilitate discussion about adding > >>>>>> cogroup > >>>>>>> to > >>>>>>>>>>>> the streams DSL. > >>>>>>>>>>>> > >>>>>>>>>>>> Please find the KIP here: > >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup > >>>>>>>>>>>> > >>>>>>>>>>>> Please find my initial implementation here: > >>>>>>>>>>>> https://github.com/KyleWinkelman/kafka > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> Kyle Winkelman > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >> > >> > >