Thanks Kyle, good arguments. Eno
> On May 7, 2017, at 5:06 PM, Kyle Winkelman <winkelman.k...@gmail.com> wrote: > > *- minor: could you add an exact example (similar to what Jay’s example is, > or like your Spark/Pig pointers had) to make this super concrete?* > I have added a more concrete example to the KIP. > > *- my main concern is that we’re exposing this optimization to the DSL. In > an ideal world, an optimizer would take the existing DSL and do the right > thing under the covers (create just one state store, arrange the nodes > etc). The original DSL had a bunch of small, composable pieces (group, > aggregate, join) that this proposal groups together. I’d like to hear your > thoughts on whether it’s possible to do this optimization with the current > DSL, at the topology builder level.* > You would have to make a lot of checks to understand if it is even possible > to make this optimization: > 1. Make sure they are all KTableKTableOuterJoins > 2. None of the intermediate KTables are used for anything else. > 3. None of the intermediate stores are used. (This may be impossible > especially if they use KafkaStreams#store after the topology has already > been built.) > You would then need to make decisions during the optimization: > 1. Your new initializer would the composite of all the individual > initializers and the valueJoiners. > 2. I am having a hard time thinking about how you would turn the > aggregators and valueJoiners into an aggregator that would work on the > final object, but this may be possible. > 3. Which state store would you use? The ones declared would be for the > aggregate values. None of the declared ones would be guaranteed to hold the > final object. This would mean you must created a new state store and not > created any of the declared ones. > > The main argument I have against it is even if it could be done I don't > know that we would want to have this be an optimization in the background > because the user would still be required to think about all of the > intermediate values that they shouldn't need to worry about if they only > care about the final object. > > In my opinion cogroup is a common enough case that it should be part of the > composable pieces (group, aggregate, join) because we want to allow people > to join more than 2 or more streams in an easy way. Right now I don't think > we give them ways of handling this use case easily. > > *-I think there will be scope for several such optimizations in the future > and perhaps at some point we need to think about decoupling the 1:1 mapping > from the DSL into the physical topology.* > I would argue that cogroup is not just an optimization it is a new way for > the users to look at accomplishing a problem that requires multiple > streams. I may sound like a broken record but I don't think users should > have to build the N-1 intermediate tables and deal with their initializers, > serdes and stores if all they care about is the final object. > Now if for example someone uses cogroup but doesn't supply additional > streams and aggregators this case is equivalent to a single grouped stream > making an aggregate call. This case is what I view an optimization as, we > could remove the KStreamCogroup and act as if there was just a call to > KGroupedStream#aggregate instead of calling KGroupedStream#cogroup. (I > would prefer to just write a warning saying that this is not how cogroup is > to be used.) > > Thanks, > Kyle > > On Sun, May 7, 2017 at 5:41 AM, Eno Thereska <eno.there...@gmail.com> wrote: > >> Hi Kyle, >> >> Thanks for the KIP again. A couple of comments: >> >> - minor: could you add an exact example (similar to what Jay’s example is, >> or like your Spark/Pig pointers had) to make this super concrete? >> >> - my main concern is that we’re exposing this optimization to the DSL. In >> an ideal world, an optimizer would take the existing DSL and do the right >> thing under the covers (create just one state store, arrange the nodes >> etc). The original DSL had a bunch of small, composable pieces (group, >> aggregate, join) that this proposal groups together. I’d like to hear your >> thoughts on whether it’s possible to do this optimization with the current >> DSL, at the topology builder level. >> >> I think there will be scope for several such optimizations in the future >> and perhaps at some point we need to think about decoupling the 1:1 mapping >> from the DSL into the physical topology. >> >> Thanks >> Eno >> >>> On May 5, 2017, at 4:39 PM, Jay Kreps <j...@confluent.io> wrote: >>> >>> I haven't digested the proposal but the use case is pretty common. An >>> example would be the "customer 360" or "unified customer profile" use >> case >>> we often use. In that use case you have a dozen systems each of which has >>> some information about your customer (account details, settings, billing >>> info, customer service contacts, purchase history, etc). Your goal is to >>> join/munge these into a single profile record for each customer that has >>> all the relevant info in a usable form and is up-to-date with all the >>> source systems. If you implement that with kstreams as a sequence of >> joins >>> i think today we'd fully materialize N-1 intermediate tables. But clearly >>> you only need a single stage to group all these things that are already >>> co-partitioned. A distributed database would do this under the covers >> which >>> is arguably better (at least when it does the right thing) and perhaps we >>> could do the same thing but I'm not sure we know the partitioning so we >> may >>> need an explicit cogroup command that impllies they are already >>> co-partitioned. >>> >>> -Jay >>> >>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman <winkelman.k...@gmail.com >>> >>> wrote: >>> >>>> Yea thats a good way to look at it. >>>> I have seen this type of functionality in a couple other platforms like >>>> spark and pig. >>>> https://spark.apache.org/docs/0.6.2/api/core/spark/ >> PairRDDFunctions.html >>>> https://www.tutorialspoint.com/apache_pig/apache_pig_ >> cogroup_operator.htm >>>> >>>> >>>> On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com> wrote: >>>> >>>>> Hi Kyle, >>>>> >>>>> If i'm reading this correctly it is like an N way outer join? So an >> input >>>>> on any stream will always produce a new aggregated value - is that >>>> correct? >>>>> Effectively, each Aggregator just looks up the current value, >> aggregates >>>>> and forwards the result. >>>>> I need to look into it and think about it a bit more, but it seems like >>>> it >>>>> could be a useful optimization. >>>>> >>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman <winkelman.k...@gmail.com> >>>>> wrote: >>>>> >>>>>> I sure can. I have added the following description to my KIP. If this >>>>>> doesn't help let me know and I will take some more time to build a >>>>> diagram >>>>>> and make more of a step by step description: >>>>>> >>>>>> Example with Current API: >>>>>> >>>>>> KTable<K, V1> table1 = >>>>>> builder.stream("topic1").groupByKey().aggregate(initializer1, >>>>> aggregator1, >>>>>> aggValueSerde1, storeName1); >>>>>> KTable<K, V2> table2 = >>>>>> builder.stream("topic2").groupByKey().aggregate(initializer2, >>>>> aggregator2, >>>>>> aggValueSerde2, storeName2); >>>>>> KTable<K, V3> table3 = >>>>>> builder.stream("topic3").groupByKey().aggregate(initializer3, >>>>> aggregator3, >>>>>> aggValueSerde3, storeName3); >>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2, >>>>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree); >>>>>> >>>>>> As you can see this creates 3 StateStores, requires 3 initializers, >>>> and 3 >>>>>> aggValueSerdes. This also adds the pressure to user to define what the >>>>>> intermediate values are going to be (V1, V2, V3). They are left with a >>>>>> couple choices, first to make V1, V2, and V3 all the same as CG and >> the >>>>> two >>>>>> joiners are more like mergers, or second make them intermediate states >>>>> such >>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners use those to >>>> build >>>>>> the final aggregate CG value. This is something the user could avoid >>>>>> thinking about with this KIP. >>>>>> >>>>>> When a new input arrives lets say at "topic1" it will first go through >>>> a >>>>>> KStreamAggregate grabbing the current aggregate from storeName1. It >>>> will >>>>>> produce this in the form of the first intermediate value and get sent >>>>>> through a KTableKTableOuterJoin where it will look up the current >> value >>>>> of >>>>>> the key in storeName2. It will use the first joiner to calculate the >>>>> second >>>>>> intermediate value, which will go through an additional >>>>>> KTableKTableOuterJoin. Here it will look up the current value of the >>>> key >>>>> in >>>>>> storeName3 and use the second joiner to build the final aggregate >>>> value. >>>>>> >>>>>> If you think through all possibilities for incoming topics you will >> see >>>>>> that no matter which topic it comes in through all three stores are >>>>> queried >>>>>> and all of the joiners must get used. >>>>>> >>>>>> Topology wise for N incoming streams this creates N >>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1 >>>>>> KTableKTableJoinMergers. >>>>>> >>>>>> >>>>>> >>>>>> Example with Proposed API: >>>>>> >>>>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1"). >>>> groupByKey(); >>>>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2"). >>>> groupByKey(); >>>>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3"). >>>> groupByKey(); >>>>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1, aggregator1, >>>>>> aggValueSerde1, storeName1) >>>>>> .cogroup(grouped2, aggregator2) >>>>>> .cogroup(grouped3, aggregator3) >>>>>> .aggregate(); >>>>>> >>>>>> As you can see this creates 1 StateStore, requires 1 initializer, and >> 1 >>>>>> aggValueSerde. The user no longer has to worry about the intermediate >>>>>> values and the joiners. All they have to think about is how each >> stream >>>>>> impacts the creation of the final CG object. >>>>>> >>>>>> When a new input arrives lets say at "topic1" it will first go through >>>> a >>>>>> KStreamAggreagte and grab the current aggregate from storeName1. It >>>> will >>>>>> add its incoming object to the aggregate, update the store and pass >> the >>>>> new >>>>>> aggregate on. This new aggregate goes through the KStreamCogroup which >>>> is >>>>>> pretty much just a pass through processor and you are done. >>>>>> >>>>>> Topology wise for N incoming streams the new api will only every >>>> create N >>>>>> KStreamAggregates and 1 KStreamCogroup. >>>>>> >>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax < >> matth...@confluent.io >>>>> >>>>>> wrote: >>>>>> >>>>>>> Kyle, >>>>>>> >>>>>>> thanks a lot for the KIP. Maybe I am a little slow, but I could not >>>>>>> follow completely. Could you maybe add a more concrete example, like >>>> 3 >>>>>>> streams with 3 records each (plus expected result), and show the >>>>>>> difference between current way to to implement it and the proposed >>>> API? >>>>>>> This could also cover the internal processing to see what store calls >>>>>>> would be required for both approaches etc. >>>>>>> >>>>>>> I think, it's pretty advanced stuff you propose, and it would help to >>>>>>> understand it better. >>>>>>> >>>>>>> Thanks a lot! >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote: >>>>>>>> I have made a pull request. It can be found here. >>>>>>>> >>>>>>>> https://github.com/apache/kafka/pull/2975 >>>>>>>> >>>>>>>> I plan to write some more unit tests for my classes and get around >>>> to >>>>>>>> writing documentation for the public api additions. >>>>>>>> >>>>>>>> One thing I was curious about is during the >>>>>>> KCogroupedStreamImpl#aggregate >>>>>>>> method I pass null to the KGroupedStream#repartitionIfRequired >>>>> method. >>>>>> I >>>>>>>> can't supply the store name because if more than one grouped stream >>>>>>>> repartitions an error is thrown. Is there some name that someone >>>> can >>>>>>>> recommend or should I leave the null and allow it to fall back to >>>> the >>>>>>>> KGroupedStream.name? >>>>>>>> >>>>>>>> Should this be expanded to handle grouped tables? This would be >>>>> pretty >>>>>>> easy >>>>>>>> for a normal aggregate but one allowing session stores and windowed >>>>>>> stores >>>>>>>> would required KTableSessionWindowAggregate and >>>> KTableWindowAggregate >>>>>>>> implementations. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Kyle >>>>>>>> >>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" <eno.there...@gmail.com> >>>>> wrote: >>>>>>>> >>>>>>>>> I’ll look as well asap, sorry, been swamped. >>>>>>>>> >>>>>>>>> Eno >>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy <damian....@gmail.com> >>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi Kyle, >>>>>>>>>> >>>>>>>>>> Thanks for the KIP. I apologize that i haven't had the chance to >>>>> look >>>>>>> at >>>>>>>>>> the KIP yet, but will schedule some time to look into it >>>> tomorrow. >>>>>> For >>>>>>>>> the >>>>>>>>>> implementation, can you raise a PR against kafka trunk and mark >>>> it >>>>> as >>>>>>>>> WIP? >>>>>>>>>> It will be easier to review what you have done. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Damian >>>>>>>>>> >>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman < >>>>> winkelman.k...@gmail.com >>>>>>> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I am replying to this in hopes it will draw some attention to my >>>>> KIP >>>>>>> as >>>>>>>>> I >>>>>>>>>>> haven't heard from anyone in a couple days. This is my first KIP >>>>> and >>>>>>> my >>>>>>>>>>> first large contribution to the project so I'm sure I did >>>>> something >>>>>>>>> wrong. >>>>>>>>>>> ;) >>>>>>>>>>> >>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" < >>>>> winkelman.k...@gmail.com> >>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello all, >>>>>>>>>>>> >>>>>>>>>>>> I have created KIP-150 to facilitate discussion about adding >>>>>> cogroup >>>>>>> to >>>>>>>>>>>> the streams DSL. >>>>>>>>>>>> >>>>>>>>>>>> Please find the KIP here: >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup >>>>>>>>>>>> >>>>>>>>>>>> Please find my initial implementation here: >>>>>>>>>>>> https://github.com/KyleWinkelman/kafka >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Kyle Winkelman >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> >>