Hi, I am just catching up on this thread. (1) as most people agree, we should not add anything to KStreamBuilder (btw: we actually plan to move #merge() to KStream and deprecate it on KStreamBuilder as it's a quite unnatural API atm).
About specifying Serdes: there is still the idea to improve to overall API from the current "we are adding more overloads"-pattern to a builder-like pattern. This might make the whole discussion void if we do this. Thus, it might make sense to keep this in mind (or even delay this KIP?). It seems a waste of time to discuss all this if we are going to chance it in 2 month anyway... Just saying. -Matthias On 6/13/17 8:05 AM, Michal Borowiecki wrote: > You're right, I haven't thought of that. > > Cheers, > > Michał > > > On 13/06/17 13:00, Kyle Winkelman wrote: >> First, I would prefer not calling it aggregate because there are already >> plenty of aggregate methods. >> >> Second, I dont think this would really work because after each aggregate >> you now have a unique KTable (someone may want a table with 4 streams and >> reuse those 4 in another table but with one more stream added) and unless >> we completely duplicate everything every time this isnt really possible. >> Additionally, the cogroup way just requires 1 more call to create two >> different tables (normal, windowed, and session windowed) this new way >> would require copying the aggregate chain. >> >> Another way to think about it is with cogroup we know that when they call >> aggregate they arent going to be adding any more aggregators to that table >> but your way requires us to assume they are done adding aggregators after >> each call so we must return a ktable just to possibly not need to have >> created it. >> >> On Jun 13, 2017 5:20 AM, "Michal Borowiecki" <michal.borowie...@openbet.com> >> wrote: >> >>> Actually, just had a thought. It started with naming. >>> >>> Are we actually co-grouping these streams or are we co-aggregating them? >>> >>> After all, in each of the cogroup calls we are providing an Aggregator >>> implementation. >>> >>> >>> If they are really co-aggregated, why don't we turn this around: >>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1").groupByKey(); >>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2").groupByKey(); >>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3").groupByKey(); >>> >>> KTable<K, CG> coagg = grouped1.aggregate(initializer1, aggregator1, >>> aggValueSerde1) // this is the unchanged aggregate method >>> .aggregate(grouped2, aggregator2) // this is a new method >>> .aggregate(grouped3, aggregator3); // ditto >>> >>> This means instead of adding cogroup methods on KGroupStream interface, >>> adding aggregate method on KTable interface. >>> >>> Is that feasible? >>> >>> Cheers, >>> Michał >>> >>> On 13/06/17 10:56, Michal Borowiecki wrote: >>> >>> Also, I still feel that putting initializer on the first cogroup can >>> mislead users into thinking the first stream is in some way special. >>> Just my 5c. >>> Michał >>> >>> On 13/06/17 09:54, Michal Borowiecki wrote: >>> >>> Agree completely with the argument for serdes belonging in the same place >>> as the state store name, which is in the aggregate method. >>> >>> Cheers, >>> >>> Michał >>> >>> On 12/06/17 18:20, Xavier Léauté wrote: >>> >>> I think we are discussing two separate things here, so it might be worth >>> clarifying: >>> >>> 1) the position of the initializer with respect to the aggregators. If I >>> understand correctly, Guozhang seems to think it is more natural to specify >>> the initializer first, despite it not bearing any relation to the first >>> aggregator. I can see the argument for specifying the initializer first, >>> but I think it is debatable whether mixing it into the first cogroup call >>> leads to a cleaner API or not. >>> >>> 2) where the serde should be defined (if necessary). Looking at our >>> existing APIs in KGroupedStreams, we always offer two aggregate() >>> methods. The first one takes the name of the store and associated aggregate >>> value serde e.g. KGroupedStream.aggregate(Initializer<VR> initializer, >>> Aggregator<? super K, ? super V, VR> aggregator, Serde<VR> aggValueSerde, >>> String queryableStoreName) >>> The second one only takes a state store supplier, and does not specify any >>> serde, e.g. KGroupedStream.aggregate(Initializer<VR> >>> initializer, Aggregator<? super K, ? super V, VR> aggregator, final >>> StateStoreSupplier<KeyValueStore> storeSupplier) >>> Presumably, when specifying a state store supplier it shouldn't be >>> necessary to specify an aggregate value serde, since the provided >>> statestore might not need to serialize the values (e.g. it may just keep >>> them as regular objects in heap) or it may have its own >>> internal serialization format. >>> >>> For consistency I think it would be valuable to preserve the same two >>> aggregate methods for cogroup as well. Since the serde is only required in >>> one of the two cases, I believe the serde has no place in the first >>> cogroup() call and should only have to be specified as part of the >>> aggregate() method that takes a state store name. In the case of a state >>> store supplier, no serde would be necessary. >>> >>> >>> On Sat, Jun 10, 2017 at 4:09 PM Guozhang Wang <wangg...@gmail.com> wrote: >>> >>>> I'd agree that the aggregate value serde and the initializer does not >>>> bear direct relationship with the first `cogroup` calls, but after I tried >>>> to write some example code with these two different set of APIs I felt the >>>> current APIs just program more naturally. >>>> >>>> I know it is kinda subjective, but I do think that user experience may be >>>> more important as a deciding factor than the logical argument for public >>>> interfaces. So I'd recommend people to also try out writing some example >>>> lines also and we can circle back and discuss which one feels more natural >>>> to write code. >>>> >>>> >>>> Guozhang >>>> >>>> On Fri, Jun 9, 2017 at 1:59 AM, Michal Borowiecki < >>>> michal.borowie...@openbet.com> wrote: >>>> >>>>> I feel it would make more sense to move the initializer and serde to the >>>>> final aggregate statement, since the serde only applies to the state >>>>> store, >>>>> and the initializer doesn't bear any relation to the first group in >>>>> particular. >>>>> >>>>> +1 for moving initializer and serde from cogroup() to the aggregate() >>>>> for the reasons mentioned above. >>>>> >>>>> Cheers, >>>>> >>>>> Michał >>>>> >>>>> On 08/06/17 22:44, Guozhang Wang wrote: >>>>> >>>> Note that although the internal `AbstractStoreSupplier` does maintain the >>>>> key-value serdes, we do not enforce the interface of `StateStoreSupplier` >>>>> to always retain that information, and hence we cannot assume that >>>>> StateStoreSuppliers always retain key / value serdes. >>>>> >>>>> On Thu, Jun 8, 2017 at 11:51 AM, Xavier Léauté <xav...@confluent.io> >>>>> <xav...@confluent.io> wrote: >>>>> >>>>> >>>>> Another reason for the serde not to be in the first cogroup call, is that >>>>> the serde should not be required if you pass a StateStoreSupplier to >>>>> aggregate() >>>>> >>>>> Regarding the aggregated type <T> I don't the why initializer should be >>>>> favored over aggregator to define the type. In my mind separating the >>>>> initializer into the last aggregate call clearly indicates that the >>>>> initializer is independent of any of the aggregators or streams and that >>>>> we >>>>> don't wait for grouped1 events to initialize the co-group. >>>>> >>>>> On Thu, Jun 8, 2017 at 11:14 AM Guozhang Wang <wangg...@gmail.com> >>>>> <wangg...@gmail.com> wrote: >>>>> >>>>> >>>>> On a second thought... This is the current proposal API >>>>> >>>>> >>>>> ``` >>>>> >>>>> <T> CogroupedKStream<K, T> cogroup(final Initializer<T> initializer, >>>>> >>>>> final >>>>> >>>>> Aggregator<? super K, ? super V, T> aggregator, final Serde<T> >>>>> aggValueSerde) >>>>> >>>>> ``` >>>>> >>>>> >>>>> If we do not have the initializer in the first co-group it might be a bit >>>>> awkward for users to specify the aggregator that returns a typed <T> >>>>> >>>>> value? >>>>> >>>>> Maybe it is still better to put these two functions in the same api? >>>>> >>>>> >>>>> >>>>> Guozhang >>>>> >>>>> On Thu, Jun 8, 2017 at 11:08 AM, Guozhang Wang <wangg...@gmail.com> >>>>> <wangg...@gmail.com> >>>>> >>>>> wrote: >>>>> >>>>> This suggestion lgtm. I would vote for the first alternative than >>>>> >>>>> adding >>>>> >>>>> it to the `KStreamBuilder` though. >>>>> >>>>> On Thu, Jun 8, 2017 at 10:58 AM, Xavier Léauté <xav...@confluent.io> >>>>> <xav...@confluent.io> >>>>> wrote: >>>>> >>>>> >>>>> I have a minor suggestion to make the API a little bit more symmetric. >>>>> I feel it would make more sense to move the initializer and serde to >>>>> >>>>> the >>>>> >>>>> final aggregate statement, since the serde only applies to the state >>>>> store, >>>>> and the initializer doesn't bear any relation to the first group in >>>>> particular. It would end up looking like this: >>>>> >>>>> KTable<K, CG> cogrouped = >>>>> grouped1.cogroup(aggregator1) >>>>> .cogroup(grouped2, aggregator2) >>>>> .cogroup(grouped3, aggregator3) >>>>> .aggregate(initializer1, aggValueSerde, storeName1); >>>>> >>>>> Alternatively, we could move the the first cogroup() method to >>>>> KStreamBuilder, similar to how we have .merge() >>>>> and end up with an api that would be even more symmetric. >>>>> >>>>> KStreamBuilder.cogroup(grouped1, aggregator1) >>>>> .cogroup(grouped2, aggregator2) >>>>> .cogroup(grouped3, aggregator3) >>>>> .aggregate(initializer1, aggValueSerde, storeName1); >>>>> >>>>> This doesn't have to be a blocker, but I thought it would make the API >>>>> just >>>>> a tad cleaner. >>>>> >>>>> On Tue, Jun 6, 2017 at 3:59 PM Guozhang Wang <wangg...@gmail.com> >>>>> <wangg...@gmail.com> >>>>> >>>>> wrote: >>>>> >>>>> Kyle, >>>>> >>>>> Thanks a lot for the updated KIP. It looks good to me. >>>>> >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski <j...@jagunet.com> >>>>> <j...@jagunet.com> >>>>> >>>>> wrote: >>>>> >>>>> This makes much more sense to me. +1 >>>>> >>>>> >>>>> On Jun 1, 2017, at 10:33 AM, Kyle Winkelman < >>>>> >>>>> winkelman.k...@gmail.com> >>>>> >>>>> wrote: >>>>> >>>>> I have updated the KIP and my PR. Let me know what you think. >>>>> To created a cogrouped stream just call cogroup on a >>>>> >>>>> KgroupedStream >>>>> >>>>> and >>>>> >>>>> supply the initializer, aggValueSerde, and an aggregator. Then >>>>> >>>>> continue >>>>> >>>>> adding kgroupedstreams and aggregators. Then call one of the >>>>> >>>>> many >>>>> >>>>> aggregate >>>>> >>>>> calls to create a KTable. >>>>> >>>>> Thanks, >>>>> Kyle >>>>> >>>>> On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com> >>>>> <damian....@gmail.com> >>>>> >>>>> wrote: >>>>> >>>>> Hi Kyle, >>>>> >>>>> Thanks for the update. I think just one initializer makes sense >>>>> >>>>> as >>>>> >>>>> it >>>>> >>>>> should only be called once per key and generally it is just >>>>> >>>>> going >>>>> >>>>> to >>>>> >>>>> create >>>>> >>>>> a new instance of whatever the Aggregate class is. >>>>> >>>>> Cheers, >>>>> Damian >>>>> >>>>> On Wed, 31 May 2017 at 20:09 Kyle Winkelman < >>>>> >>>>> winkelman.k...@gmail.com >>>>> >>>>> wrote: >>>>> >>>>> >>>>> Hello all, >>>>> >>>>> I have spent some more time on this and the best alternative I >>>>> >>>>> have >>>>> >>>>> come >>>>> >>>>> up >>>>> >>>>> with is: >>>>> KGroupedStream has a single cogroup call that takes an >>>>> >>>>> initializer >>>>> >>>>> and >>>>> >>>>> an >>>>> >>>>> aggregator. >>>>> CogroupedKStream has a cogroup call that takes additional >>>>> >>>>> groupedStream >>>>> >>>>> aggregator pairs. >>>>> CogroupedKStream has multiple aggregate methods that create >>>>> >>>>> the >>>>> >>>>> different >>>>> >>>>> stores. >>>>> >>>>> I plan on updating the kip but I want people's input on if we >>>>> >>>>> should >>>>> >>>>> have >>>>> >>>>> the initializer be passed in once at the beginning or if we >>>>> >>>>> should >>>>> >>>>> instead >>>>> >>>>> have the initializer be required for each call to one of the >>>>> >>>>> aggregate >>>>> >>>>> calls. The first makes more sense to me but doesnt allow the >>>>> >>>>> user >>>>> >>>>> to >>>>> >>>>> specify different initializers for different tables. >>>>> >>>>> Thanks, >>>>> Kyle >>>>> >>>>> On May 24, 2017 7:46 PM, "Kyle Winkelman" < >>>>> >>>>> winkelman.k...@gmail.com> >>>>> >>>>> wrote: >>>>> >>>>> >>>>> Yea I really like that idea I'll see what I can do to update >>>>> >>>>> the >>>>> >>>>> kip >>>>> >>>>> and >>>>> >>>>> my pr when I have some time. I'm not sure how well creating >>>>> >>>>> the >>>>> >>>>> kstreamaggregates will go though because at that point I will >>>>> >>>>> have >>>>> >>>>> thrown >>>>> >>>>> away the type of the values. It will be type safe I just may >>>>> >>>>> need to >>>>> >>>>> do a >>>>> >>>>> little forcing. >>>>> >>>>> Thanks, >>>>> Kyle >>>>> >>>>> On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com >>>>> >>>>> wrote: >>>>> >>>>> Kyle, >>>>> >>>>> Thanks for the explanations, my previous read on the wiki >>>>> >>>>> examples >>>>> >>>>> was >>>>> >>>>> wrong. >>>>> >>>>> So I guess my motivation should be "reduced" to: can we move >>>>> >>>>> the >>>>> >>>>> window >>>>> >>>>> specs param from "KGroupedStream#cogroup(..)" to >>>>> "CogroupedKStream#aggregate(..)", and my motivations are: >>>>> >>>>> 1. minor: we can reduce the #.generics in CogroupedKStream >>>>> >>>>> from >>>>> >>>>> 3 >>>>> >>>>> to >>>>> >>>>> 2. >>>>> >>>>> 2. major: this is for extensibility of the APIs, and since >>>>> >>>>> we >>>>> >>>>> are >>>>> >>>>> removing >>>>> >>>>> the "Evolving" annotations on Streams it may be harder to >>>>> >>>>> change it >>>>> >>>>> again >>>>> >>>>> in the future. The extended use cases are that people wanted >>>>> >>>>> to >>>>> >>>>> have >>>>> >>>>> windowed running aggregates on different granularities, e.g. >>>>> >>>>> "give >>>>> >>>>> me >>>>> >>>>> the >>>>> >>>>> counts per-minute, per-hour, per-day and per-week", and >>>>> >>>>> today >>>>> >>>>> in >>>>> >>>>> DSL >>>>> >>>>> we >>>>> >>>>> need to specify that case in multiple aggregate operators, >>>>> >>>>> which >>>>> >>>>> gets >>>>> >>>>> a >>>>> >>>>> state store / changelog, etc. And it is possible to optimize >>>>> >>>>> it >>>>> >>>>> as >>>>> >>>>> well >>>>> >>>>> to >>>>> >>>>> a single state store. Its implementation would be tricky as >>>>> >>>>> you >>>>> >>>>> need >>>>> >>>>> to >>>>> >>>>> contain different lengthed windows within your window store >>>>> >>>>> but >>>>> >>>>> just >>>>> >>>>> from >>>>> >>>>> the public API point of view, it could be specified as: >>>>> >>>>> CogroupedKStream stream = stream1.cogroup(stream2, ... >>>>> "state-store-name"); >>>>> >>>>> table1 = stream.aggregate(/*per-minute window*/) >>>>> table2 = stream.aggregate(/*per-hour window*/) >>>>> table3 = stream.aggregate(/*per-day window*/) >>>>> >>>>> while underlying we are only using a single store >>>>> >>>>> "state-store-name" >>>>> >>>>> for >>>>> >>>>> it. >>>>> >>>>> >>>>> Although this feature is out of the scope of this KIP, I'd >>>>> >>>>> like >>>>> >>>>> to >>>>> >>>>> discuss >>>>> >>>>> if we can "leave the door open" to make such changes without >>>>> >>>>> modifying >>>>> >>>>> the >>>>> >>>>> public APIs . >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman < >>>>> >>>>> winkelman.k...@gmail.com >>>>> >>>>> wrote: >>>>> >>>>> >>>>> I allow defining a single window/sessionwindow one time >>>>> >>>>> when >>>>> >>>>> you >>>>> >>>>> make >>>>> >>>>> the >>>>> >>>>> cogroup call from a KGroupedStream. From then on you are >>>>> >>>>> using >>>>> >>>>> the >>>>> >>>>> cogroup >>>>> >>>>> call from with in CogroupedKStream which doesnt accept any >>>>> >>>>> additional >>>>> >>>>> windows/sessionwindows. >>>>> >>>>> Is this what you meant by your question or did I >>>>> >>>>> misunderstand? >>>>> >>>>> On May 23, 2017 9:33 PM, "Guozhang Wang" < >>>>> >>>>> wangg...@gmail.com >>>>> >>>>> wrote: >>>>> >>>>> Another question that came to me is on "window alignment": >>>>> >>>>> from >>>>> >>>>> the >>>>> >>>>> KIP >>>>> >>>>> it >>>>> >>>>> seems you are allowing users to specify a (potentially >>>>> >>>>> different) >>>>> >>>>> window >>>>> >>>>> spec in each co-grouped input stream. So if these window >>>>> >>>>> specs >>>>> >>>>> are >>>>> >>>>> different how should we "align" them with different input >>>>> >>>>> streams? I >>>>> >>>>> think >>>>> >>>>> it is more natural to only specify on window spec in the >>>>> >>>>> KTable<RK, V> CogroupedKStream#aggregate(Windows); >>>>> >>>>> >>>>> And remove it from the cogroup() functions. WDYT? >>>>> >>>>> >>>>> Guozhang >>>>> >>>>> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang < >>>>> >>>>> wangg...@gmail.com> >>>>> >>>>> wrote: >>>>> >>>>> Thanks for the proposal Kyle, this is a quite common use >>>>> >>>>> case >>>>> >>>>> to >>>>> >>>>> support >>>>> >>>>> such multi-way table join (i.e. N source tables with N >>>>> >>>>> aggregate >>>>> >>>>> func) >>>>> >>>>> with >>>>> >>>>> a single store and N+1 serdes, I have seen lots of people >>>>> >>>>> using >>>>> >>>>> the >>>>> >>>>> low-level PAPI to achieve this goal. >>>>> >>>>> >>>>> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman < >>>>> >>>>> winkelman.k...@gmail.com >>>>> >>>>> wrote: >>>>> >>>>> I like your point about not handling other cases such as >>>>> >>>>> count >>>>> >>>>> and >>>>> >>>>> reduce. >>>>> >>>>> I think that reduce may not make sense because reduce >>>>> >>>>> assumes >>>>> >>>>> that >>>>> >>>>> the >>>>> >>>>> input values are the same as the output values. With >>>>> >>>>> cogroup >>>>> >>>>> ... > > -- > Signature > <http://www.openbet.com/> Michal Borowiecki > Senior Software Engineer L4 > T: +44 208 742 1600 > > > +44 203 249 8448 > > > > E: michal.borowie...@openbet.com > W: www.openbet.com <http://www.openbet.com/> > > > OpenBet Ltd > > Chiswick Park Building 9 > > 566 Chiswick High Rd > > London > > W4 5XT > > UK > > > <https://www.openbet.com/email_promo> > > This message is confidential and intended only for the addressee. If you > have received this message in error, please immediately notify the > postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it > from your system as well as any copies. The content of e-mails as well > as traffic data may be monitored by OpenBet for employment and security > purposes. To protect the environment please do not print this e-mail > unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building > 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company > registered in England and Wales. Registered no. 3134634. VAT no. > GB927523612 >
signature.asc
Description: OpenPGP digital signature