Yeah - i wish we'd named KGroupedStream GroupedKStream, similarly for KGroupedTable.
On Tue, 16 May 2017 at 17:59 Kyle Winkelman <winkelman.k...@gmail.com> wrote: > I have added code blocks and a note about the partial results. > > Can I ask why you dont like KCogroupedStream? I just think that because it > is created from a KGroupedStream we should keep a similar name format. > > On May 16, 2017 9:23 AM, "Damian Guy" <damian....@gmail.com> wrote: > > Hi Kyle, > > Can you put the code examples etc in {code} blocks to make it easier to > read? > > I think this is probably a pretty common use-case and therefore a > worthwhile addition to the API. > > I'd suggest dropping the K from KCogroupedStream and calling it > CogroupedStream or CogroupedKStream. > In your example, wouldn't the output potentially have more partial results? > I.e, for each input on any stream you'd potentially see a record produced? > I think it is worth mentioning this. > > Thanks, > Damian > > On Tue, 16 May 2017 at 12:26 Kyle Winkelman <winkelman.k...@gmail.com> > wrote: > > > No problem, I just wanted to make sure people still had more to say. I > will > > wait another week. > > > > Thanks, > > Kyle > > > > On May 16, 2017 4:25 AM, "Eno Thereska" <eno.there...@gmail.com> wrote: > > > > > Hi Kyle, > > > > > > Sorry for the delay in reviews, tomorrow is feature freeze deadline ( > > > > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0 > > < > > > > https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0 > > >) > > > so people are busier than usual. Stay tuned. > > > > > > Eno > > > > On 15 May 2017, at 13:25, Kyle Winkelman <winkelman.k...@gmail.com> > > > wrote: > > > > > > > > Damian Guy, could you let me know if you plan to review this further? > > > There > > > > is no rush, but if you dont have any additional comments I could > start > > > the > > > > voting and finish my WIP PR. > > > > > > > > Thanks, > > > > Kyle > > > > > > > > On May 9, 2017 11:07 AM, "Kyle Winkelman" <winkelman.k...@gmail.com> > > > wrote: > > > > > > > >> Eno, is there anyone else that is an expert in the kafka streams > realm > > > >> that I should reach out to for input? > > > >> > > > >> I believe Damian Guy is still planning on reviewing this more in > depth > > > so > > > >> I will wait for his inputs before continuing. > > > >> > > > >> On May 9, 2017 7:30 AM, "Eno Thereska" <eno.there...@gmail.com> > > wrote: > > > >> > > > >>> Thanks Kyle, good arguments. > > > >>> > > > >>> Eno > > > >>> > > > >>>> On May 7, 2017, at 5:06 PM, Kyle Winkelman < > > winkelman.k...@gmail.com> > > > >>> wrote: > > > >>>> > > > >>>> *- minor: could you add an exact example (similar to what Jay’s > > > example > > > >>> is, > > > >>>> or like your Spark/Pig pointers had) to make this super concrete?* > > > >>>> I have added a more concrete example to the KIP. > > > >>>> > > > >>>> *- my main concern is that we’re exposing this optimization to the > > > DSL. > > > >>> In > > > >>>> an ideal world, an optimizer would take the existing DSL and do > the > > > >>> right > > > >>>> thing under the covers (create just one state store, arrange the > > nodes > > > >>>> etc). The original DSL had a bunch of small, composable pieces > > (group, > > > >>>> aggregate, join) that this proposal groups together. I’d like to > > hear > > > >>> your > > > >>>> thoughts on whether it’s possible to do this optimization with the > > > >>> current > > > >>>> DSL, at the topology builder level.* > > > >>>> You would have to make a lot of checks to understand if it is even > > > >>> possible > > > >>>> to make this optimization: > > > >>>> 1. Make sure they are all KTableKTableOuterJoins > > > >>>> 2. None of the intermediate KTables are used for anything else. > > > >>>> 3. None of the intermediate stores are used. (This may be > impossible > > > >>>> especially if they use KafkaStreams#store after the topology has > > > already > > > >>>> been built.) > > > >>>> You would then need to make decisions during the optimization: > > > >>>> 1. Your new initializer would the composite of all the individual > > > >>>> initializers and the valueJoiners. > > > >>>> 2. I am having a hard time thinking about how you would turn the > > > >>>> aggregators and valueJoiners into an aggregator that would work on > > the > > > >>>> final object, but this may be possible. > > > >>>> 3. Which state store would you use? The ones declared would be for > > the > > > >>>> aggregate values. None of the declared ones would be guaranteed to > > > hold > > > >>> the > > > >>>> final object. This would mean you must created a new state store > and > > > not > > > >>>> created any of the declared ones. > > > >>>> > > > >>>> The main argument I have against it is even if it could be done I > > > don't > > > >>>> know that we would want to have this be an optimization in the > > > >>> background > > > >>>> because the user would still be required to think about all of the > > > >>>> intermediate values that they shouldn't need to worry about if > they > > > only > > > >>>> care about the final object. > > > >>>> > > > >>>> In my opinion cogroup is a common enough case that it should be > part > > > of > > > >>> the > > > >>>> composable pieces (group, aggregate, join) because we want to > allow > > > >>> people > > > >>>> to join more than 2 or more streams in an easy way. Right now I > > don't > > > >>> think > > > >>>> we give them ways of handling this use case easily. > > > >>>> > > > >>>> *-I think there will be scope for several such optimizations in > the > > > >>> future > > > >>>> and perhaps at some point we need to think about decoupling the > 1:1 > > > >>> mapping > > > >>>> from the DSL into the physical topology.* > > > >>>> I would argue that cogroup is not just an optimization it is a new > > way > > > >>> for > > > >>>> the users to look at accomplishing a problem that requires > multiple > > > >>>> streams. I may sound like a broken record but I don't think users > > > should > > > >>>> have to build the N-1 intermediate tables and deal with their > > > >>> initializers, > > > >>>> serdes and stores if all they care about is the final object. > > > >>>> Now if for example someone uses cogroup but doesn't supply > > additional > > > >>>> streams and aggregators this case is equivalent to a single > grouped > > > >>> stream > > > >>>> making an aggregate call. This case is what I view an optimization > > as, > > > >>> we > > > >>>> could remove the KStreamCogroup and act as if there was just a > call > > to > > > >>>> KGroupedStream#aggregate instead of calling > KGroupedStream#cogroup. > > (I > > > >>>> would prefer to just write a warning saying that this is not how > > > >>> cogroup is > > > >>>> to be used.) > > > >>>> > > > >>>> Thanks, > > > >>>> Kyle > > > >>>> > > > >>>> On Sun, May 7, 2017 at 5:41 AM, Eno Thereska < > > eno.there...@gmail.com> > > > >>> wrote: > > > >>>> > > > >>>>> Hi Kyle, > > > >>>>> > > > >>>>> Thanks for the KIP again. A couple of comments: > > > >>>>> > > > >>>>> - minor: could you add an exact example (similar to what Jay’s > > > example > > > >>> is, > > > >>>>> or like your Spark/Pig pointers had) to make this super concrete? > > > >>>>> > > > >>>>> - my main concern is that we’re exposing this optimization to the > > > DSL. > > > >>> In > > > >>>>> an ideal world, an optimizer would take the existing DSL and do > the > > > >>> right > > > >>>>> thing under the covers (create just one state store, arrange the > > > nodes > > > >>>>> etc). The original DSL had a bunch of small, composable pieces > > > (group, > > > >>>>> aggregate, join) that this proposal groups together. I’d like to > > hear > > > >>> your > > > >>>>> thoughts on whether it’s possible to do this optimization with > the > > > >>> current > > > >>>>> DSL, at the topology builder level. > > > >>>>> > > > >>>>> I think there will be scope for several such optimizations in the > > > >>> future > > > >>>>> and perhaps at some point we need to think about decoupling the > 1:1 > > > >>> mapping > > > >>>>> from the DSL into the physical topology. > > > >>>>> > > > >>>>> Thanks > > > >>>>> Eno > > > >>>>> > > > >>>>>> On May 5, 2017, at 4:39 PM, Jay Kreps <j...@confluent.io> wrote: > > > >>>>>> > > > >>>>>> I haven't digested the proposal but the use case is pretty > common. > > > An > > > >>>>>> example would be the "customer 360" or "unified customer > profile" > > > use > > > >>>>> case > > > >>>>>> we often use. In that use case you have a dozen systems each of > > > which > > > >>> has > > > >>>>>> some information about your customer (account details, settings, > > > >>> billing > > > >>>>>> info, customer service contacts, purchase history, etc). Your > goal > > > is > > > >>> to > > > >>>>>> join/munge these into a single profile record for each customer > > that > > > >>> has > > > >>>>>> all the relevant info in a usable form and is up-to-date with > all > > > the > > > >>>>>> source systems. If you implement that with kstreams as a > sequence > > of > > > >>>>> joins > > > >>>>>> i think today we'd fully materialize N-1 intermediate tables. > But > > > >>> clearly > > > >>>>>> you only need a single stage to group all these things that are > > > >>> already > > > >>>>>> co-partitioned. A distributed database would do this under the > > > covers > > > >>>>> which > > > >>>>>> is arguably better (at least when it does the right thing) and > > > >>> perhaps we > > > >>>>>> could do the same thing but I'm not sure we know the > partitioning > > so > > > >>> we > > > >>>>> may > > > >>>>>> need an explicit cogroup command that impllies they are already > > > >>>>>> co-partitioned. > > > >>>>>> > > > >>>>>> -Jay > > > >>>>>> > > > >>>>>> On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman < > > > >>> winkelman.k...@gmail.com > > > >>>>>> > > > >>>>>> wrote: > > > >>>>>> > > > >>>>>>> Yea thats a good way to look at it. > > > >>>>>>> I have seen this type of functionality in a couple other > > platforms > > > >>> like > > > >>>>>>> spark and pig. > > > >>>>>>> https://spark.apache.org/docs/0.6.2/api/core/spark/ > > > >>>>> PairRDDFunctions.html > > > >>>>>>> https://www.tutorialspoint.com/apache_pig/apache_pig_ > > > >>>>> cogroup_operator.htm > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On May 5, 2017 7:43 AM, "Damian Guy" <damian....@gmail.com> > > wrote: > > > >>>>>>> > > > >>>>>>>> Hi Kyle, > > > >>>>>>>> > > > >>>>>>>> If i'm reading this correctly it is like an N way outer join? > So > > > an > > > >>>>> input > > > >>>>>>>> on any stream will always produce a new aggregated value - is > > that > > > >>>>>>> correct? > > > >>>>>>>> Effectively, each Aggregator just looks up the current value, > > > >>>>> aggregates > > > >>>>>>>> and forwards the result. > > > >>>>>>>> I need to look into it and think about it a bit more, but it > > seems > > > >>> like > > > >>>>>>> it > > > >>>>>>>> could be a useful optimization. > > > >>>>>>>> > > > >>>>>>>> On Thu, 4 May 2017 at 23:21 Kyle Winkelman < > > > >>> winkelman.k...@gmail.com> > > > >>>>>>>> wrote: > > > >>>>>>>> > > > >>>>>>>>> I sure can. I have added the following description to my KIP. > > If > > > >>> this > > > >>>>>>>>> doesn't help let me know and I will take some more time to > > build > > > a > > > >>>>>>>> diagram > > > >>>>>>>>> and make more of a step by step description: > > > >>>>>>>>> > > > >>>>>>>>> Example with Current API: > > > >>>>>>>>> > > > >>>>>>>>> KTable<K, V1> table1 = > > > >>>>>>>>> builder.stream("topic1").groupByKey().aggregate(initializer1, > > > >>>>>>>> aggregator1, > > > >>>>>>>>> aggValueSerde1, storeName1); > > > >>>>>>>>> KTable<K, V2> table2 = > > > >>>>>>>>> builder.stream("topic2").groupByKey().aggregate(initializer2, > > > >>>>>>>> aggregator2, > > > >>>>>>>>> aggValueSerde2, storeName2); > > > >>>>>>>>> KTable<K, V3> table3 = > > > >>>>>>>>> builder.stream("topic3").groupByKey().aggregate(initializer3, > > > >>>>>>>> aggregator3, > > > >>>>>>>>> aggValueSerde3, storeName3); > > > >>>>>>>>> KTable<K, CG> cogrouped = table1.outerJoin(table2, > > > >>>>>>>>> joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree); > > > >>>>>>>>> > > > >>>>>>>>> As you can see this creates 3 StateStores, requires 3 > > > initializers, > > > >>>>>>> and 3 > > > >>>>>>>>> aggValueSerdes. This also adds the pressure to user to define > > > what > > > >>> the > > > >>>>>>>>> intermediate values are going to be (V1, V2, V3). They are > left > > > >>> with a > > > >>>>>>>>> couple choices, first to make V1, V2, and V3 all the same as > CG > > > and > > > >>>>> the > > > >>>>>>>> two > > > >>>>>>>>> joiners are more like mergers, or second make them > intermediate > > > >>> states > > > >>>>>>>> such > > > >>>>>>>>> as Topic1Map, Topic2Map, and Topic3Map and the joiners use > > those > > > to > > > >>>>>>> build > > > >>>>>>>>> the final aggregate CG value. This is something the user > could > > > >>> avoid > > > >>>>>>>>> thinking about with this KIP. > > > >>>>>>>>> > > > >>>>>>>>> When a new input arrives lets say at "topic1" it will first > go > > > >>> through > > > >>>>>>> a > > > >>>>>>>>> KStreamAggregate grabbing the current aggregate from > > storeName1. > > > It > > > >>>>>>> will > > > >>>>>>>>> produce this in the form of the first intermediate value and > > get > > > >>> sent > > > >>>>>>>>> through a KTableKTableOuterJoin where it will look up the > > current > > > >>>>> value > > > >>>>>>>> of > > > >>>>>>>>> the key in storeName2. It will use the first joiner to > > calculate > > > >>> the > > > >>>>>>>> second > > > >>>>>>>>> intermediate value, which will go through an additional > > > >>>>>>>>> KTableKTableOuterJoin. Here it will look up the current value > > of > > > >>> the > > > >>>>>>> key > > > >>>>>>>> in > > > >>>>>>>>> storeName3 and use the second joiner to build the final > > aggregate > > > >>>>>>> value. > > > >>>>>>>>> > > > >>>>>>>>> If you think through all possibilities for incoming topics > you > > > will > > > >>>>> see > > > >>>>>>>>> that no matter which topic it comes in through all three > stores > > > are > > > >>>>>>>> queried > > > >>>>>>>>> and all of the joiners must get used. > > > >>>>>>>>> > > > >>>>>>>>> Topology wise for N incoming streams this creates N > > > >>>>>>>>> KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1 > > > >>>>>>>>> KTableKTableJoinMergers. > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> Example with Proposed API: > > > >>>>>>>>> > > > >>>>>>>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1"). > > > >>>>>>> groupByKey(); > > > >>>>>>>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2"). > > > >>>>>>> groupByKey(); > > > >>>>>>>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3"). > > > >>>>>>> groupByKey(); > > > >>>>>>>>> KTable<K, CG> cogrouped = grouped1.cogroup(initializer1, > > > >>> aggregator1, > > > >>>>>>>>> aggValueSerde1, storeName1) > > > >>>>>>>>> .cogroup(grouped2, aggregator2) > > > >>>>>>>>> .cogroup(grouped3, aggregator3) > > > >>>>>>>>> .aggregate(); > > > >>>>>>>>> > > > >>>>>>>>> As you can see this creates 1 StateStore, requires 1 > > initializer, > > > >>> and > > > >>>>> 1 > > > >>>>>>>>> aggValueSerde. The user no longer has to worry about the > > > >>> intermediate > > > >>>>>>>>> values and the joiners. All they have to think about is how > > each > > > >>>>> stream > > > >>>>>>>>> impacts the creation of the final CG object. > > > >>>>>>>>> > > > >>>>>>>>> When a new input arrives lets say at "topic1" it will first > go > > > >>> through > > > >>>>>>> a > > > >>>>>>>>> KStreamAggreagte and grab the current aggregate from > > storeName1. > > > It > > > >>>>>>> will > > > >>>>>>>>> add its incoming object to the aggregate, update the store > and > > > pass > > > >>>>> the > > > >>>>>>>> new > > > >>>>>>>>> aggregate on. This new aggregate goes through the > > KStreamCogroup > > > >>> which > > > >>>>>>> is > > > >>>>>>>>> pretty much just a pass through processor and you are done. > > > >>>>>>>>> > > > >>>>>>>>> Topology wise for N incoming streams the new api will only > > every > > > >>>>>>> create N > > > >>>>>>>>> KStreamAggregates and 1 KStreamCogroup. > > > >>>>>>>>> > > > >>>>>>>>> On Thu, May 4, 2017 at 4:42 PM, Matthias J. Sax < > > > >>>>> matth...@confluent.io > > > >>>>>>>> > > > >>>>>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>>> Kyle, > > > >>>>>>>>>> > > > >>>>>>>>>> thanks a lot for the KIP. Maybe I am a little slow, but I > > could > > > >>> not > > > >>>>>>>>>> follow completely. Could you maybe add a more concrete > > example, > > > >>> like > > > >>>>>>> 3 > > > >>>>>>>>>> streams with 3 records each (plus expected result), and show > > the > > > >>>>>>>>>> difference between current way to to implement it and the > > > proposed > > > >>>>>>> API? > > > >>>>>>>>>> This could also cover the internal processing to see what > > store > > > >>> calls > > > >>>>>>>>>> would be required for both approaches etc. > > > >>>>>>>>>> > > > >>>>>>>>>> I think, it's pretty advanced stuff you propose, and it > would > > > >>> help to > > > >>>>>>>>>> understand it better. > > > >>>>>>>>>> > > > >>>>>>>>>> Thanks a lot! > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> -Matthias > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> On 5/4/17 11:39 AM, Kyle Winkelman wrote: > > > >>>>>>>>>>> I have made a pull request. It can be found here. > > > >>>>>>>>>>> > > > >>>>>>>>>>> https://github.com/apache/kafka/pull/2975 > > > >>>>>>>>>>> > > > >>>>>>>>>>> I plan to write some more unit tests for my classes and get > > > >>> around > > > >>>>>>> to > > > >>>>>>>>>>> writing documentation for the public api additions. > > > >>>>>>>>>>> > > > >>>>>>>>>>> One thing I was curious about is during the > > > >>>>>>>>>> KCogroupedStreamImpl#aggregate > > > >>>>>>>>>>> method I pass null to the > > KGroupedStream#repartitionIfRequired > > > >>>>>>>> method. > > > >>>>>>>>> I > > > >>>>>>>>>>> can't supply the store name because if more than one > grouped > > > >>> stream > > > >>>>>>>>>>> repartitions an error is thrown. Is there some name that > > > someone > > > >>>>>>> can > > > >>>>>>>>>>> recommend or should I leave the null and allow it to fall > > back > > > to > > > >>>>>>> the > > > >>>>>>>>>>> KGroupedStream.name? > > > >>>>>>>>>>> > > > >>>>>>>>>>> Should this be expanded to handle grouped tables? This > would > > be > > > >>>>>>>> pretty > > > >>>>>>>>>> easy > > > >>>>>>>>>>> for a normal aggregate but one allowing session stores and > > > >>> windowed > > > >>>>>>>>>> stores > > > >>>>>>>>>>> would required KTableSessionWindowAggregate and > > > >>>>>>> KTableWindowAggregate > > > >>>>>>>>>>> implementations. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Thanks, > > > >>>>>>>>>>> Kyle > > > >>>>>>>>>>> > > > >>>>>>>>>>> On May 4, 2017 1:24 PM, "Eno Thereska" < > > eno.there...@gmail.com > > > > > > > >>>>>>>> wrote: > > > >>>>>>>>>>> > > > >>>>>>>>>>>> I’ll look as well asap, sorry, been swamped. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Eno > > > >>>>>>>>>>>>> On May 4, 2017, at 6:17 PM, Damian Guy < > > damian....@gmail.com > > > > > > > >>>>>>>> wrote: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Hi Kyle, > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Thanks for the KIP. I apologize that i haven't had the > > chance > > > >>> to > > > >>>>>>>> look > > > >>>>>>>>>> at > > > >>>>>>>>>>>>> the KIP yet, but will schedule some time to look into it > > > >>>>>>> tomorrow. > > > >>>>>>>>> For > > > >>>>>>>>>>>> the > > > >>>>>>>>>>>>> implementation, can you raise a PR against kafka trunk > and > > > mark > > > >>>>>>> it > > > >>>>>>>> as > > > >>>>>>>>>>>> WIP? > > > >>>>>>>>>>>>> It will be easier to review what you have done. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>> Damian > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> On Thu, 4 May 2017 at 11:50 Kyle Winkelman < > > > >>>>>>>> winkelman.k...@gmail.com > > > >>>>>>>>>> > > > >>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> I am replying to this in hopes it will draw some > attention > > > to > > > >>> my > > > >>>>>>>> KIP > > > >>>>>>>>>> as > > > >>>>>>>>>>>> I > > > >>>>>>>>>>>>>> haven't heard from anyone in a couple days. This is my > > first > > > >>> KIP > > > >>>>>>>> and > > > >>>>>>>>>> my > > > >>>>>>>>>>>>>> first large contribution to the project so I'm sure I > did > > > >>>>>>>> something > > > >>>>>>>>>>>> wrong. > > > >>>>>>>>>>>>>> ;) > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On May 1, 2017 4:18 PM, "Kyle Winkelman" < > > > >>>>>>>> winkelman.k...@gmail.com> > > > >>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Hello all, > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> I have created KIP-150 to facilitate discussion about > > > adding > > > >>>>>>>>> cogroup > > > >>>>>>>>>> to > > > >>>>>>>>>>>>>>> the streams DSL. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Please find the KIP here: > > > >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > >>>>>>>>>>>>>>> 150+-+Kafka-Streams+Cogroup > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Please find my initial implementation here: > > > >>>>>>>>>>>>>>> https://github.com/KyleWinkelman/kafka > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>>> Kyle Winkelman > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>> > > > >>>>> > > > >>> > > > >>> > > > > > > > > >