+1 Thanks, Bill
On Wed, Jun 14, 2017 at 8:10 PM, Xavier Léauté <xav...@confluent.io> wrote: > +1 from me > > any stream should be able to initialize the cogroup > > On Wed, Jun 14, 2017 at 3:44 PM Kyle Winkelman <winkelman.k...@gmail.com> > wrote: > > > I will update the kip to have only the aggregator in the first cogroup > call > > and the initializer and serde in the aggregate calls. > > > > This seems to be the consensus on the email chain. > > > > Thanks, > > Kyle > > > > On Jun 14, 2017 5:41 PM, wrote: > > > > That is not the case. No matter which stream the record comes in on the > > initializer is called if there is not currently an object in the store. > > > > On Jun 14, 2017 5:12 PM, "Guozhang Wang" <wangg...@gmail.com> wrote: > > > > While regarding where we should ask users to set serdes: I think I'm > > convinced by Xavier that they should be in the `aggregate` call (but only > > those does not pass in a state store supplier) instead of the > > `KStream#cogroup` call to be consistent with other aggregate functions. > > > > BTW another motivation for me to suggest keeping the initializer on the > > first stream is that by reviewing the PR (some time ago though, so again > I > > might be wrong) we will trigger the initializer only when we received an > > incoming record from the first stream whose key is not in the state store > > yet, while for other streams we will just drop it on the floor. If that > is > > actually not the case, that we call initializer on any one of the > > co-grouped streams' incoming records, then I'm open to set the > initializer > > at the `aggregate` call as well. > > > > > > Guozhang > > > > On Wed, Jun 14, 2017 at 2:23 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > I'd suggest we do not block this KIP until the serde work has been > sorted > > > out: we cannot estimate yet how long it will take yet. Instead let's > say > > > make an agreement on where we want to specify the serdes: whether on > the > > > first co-group call or on the aggregate call. > > > > > > Also about the initializer specification I actually felt that the first > > > cogrouped stream is special (Kyle please feel free to correct me if I'm > > > wrong) and that is why I thought it is better to specify the > initializer > > at > > > the beginning: since from the typing you can see that the final > > aggregated > > > value type is defined to be the same as the first co-grouped stream, > and > > > for any intermediate stream to co-group, their value types not be > > inherited > > > but the value be "incorporated" into the original stream: > > > > > > <T> CogroupedKStream<K, V> cogroup(final KGroupedStream<K, T> > > > groupedStream, final Aggregator<? super K, ? super T, V> aggregator) > > > > > > Note that we do not have a cogroup function that returns > > > CogroupedKStream<K, T>. > > > > > > > > > Guozhang > > > > > > > > > On Tue, Jun 13, 2017 at 2:31 PM, Bill Bejeck <bbej...@gmail.com> > wrote: > > > > > >> +1 on deferring discussion on Serdes until API improvements are ironed > > >> out. > > >> > > >> On Tue, Jun 13, 2017 at 2:06 PM, Matthias J. Sax < > matth...@confluent.io > > > > > >> wrote: > > >> > > >> > Hi, > > >> > > > >> > I am just catching up on this thread. (1) as most people agree, we > > >> > should not add anything to KStreamBuilder (btw: we actually plan to > > move > > >> > #merge() to KStream and deprecate it on KStreamBuilder as it's a > quite > > >> > unnatural API atm). > > >> > > > >> > About specifying Serdes: there is still the idea to improve to > overall > > >> > API from the current "we are adding more overloads"-pattern to a > > >> > builder-like pattern. This might make the whole discussion void if > we > > do > > >> > this. Thus, it might make sense to keep this in mind (or even delay > > this > > >> > KIP?). It seems a waste of time to discuss all this if we are going > to > > >> > chance it in 2 month anyway... Just saying. > > >> > > > >> > > > >> > -Matthias > > >> > > > >> > On 6/13/17 8:05 AM, Michal Borowiecki wrote: > > >> > > You're right, I haven't thought of that. > > >> > > > > >> > > Cheers, > > >> > > > > >> > > Michał > > >> > > > > >> > > > > >> > > On 13/06/17 13:00, Kyle Winkelman wrote: > > >> > >> First, I would prefer not calling it aggregate because there are > > >> already > > >> > >> plenty of aggregate methods. > > >> > >> > > >> > >> Second, I dont think this would really work because after each > > >> aggregate > > >> > >> you now have a unique KTable (someone may want a table with 4 > > streams > > >> > and > > >> > >> reuse those 4 in another table but with one more stream added) > and > > >> > unless > > >> > >> we completely duplicate everything every time this isnt really > > >> possible. > > >> > >> Additionally, the cogroup way just requires 1 more call to create > > two > > >> > >> different tables (normal, windowed, and session windowed) this > new > > >> way > > >> > >> would require copying the aggregate chain. > > >> > >> > > >> > >> Another way to think about it is with cogroup we know that when > > they > > >> > call > > >> > >> aggregate they arent going to be adding any more aggregators to > > that > > >> > table > > >> > >> but your way requires us to assume they are done adding > aggregators > > >> > after > > >> > >> each call so we must return a ktable just to possibly not need to > > >> have > > >> > >> created it. > > >> > >> > > >> > >> On Jun 13, 2017 5:20 AM, "Michal Borowiecki" < > > >> > michal.borowie...@openbet.com> > > >> > >> wrote: > > >> > >> > > >> > >>> Actually, just had a thought. It started with naming. > > >> > >>> > > >> > >>> Are we actually co-grouping these streams or are we > co-aggregating > > >> > them? > > >> > >>> > > >> > >>> After all, in each of the cogroup calls we are providing an > > >> Aggregator > > >> > >>> implementation. > > >> > >>> > > >> > >>> > > >> > >>> If they are really co-aggregated, why don't we turn this around: > > >> > >>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1"). > > >> > groupByKey(); > > >> > >>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2"). > > >> > groupByKey(); > > >> > >>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3"). > > >> > groupByKey(); > > >> > >>> > > >> > >>> KTable<K, CG> coagg = grouped1.aggregate(initializer1, > > aggregator1, > > >> > >>> aggValueSerde1) // this is the unchanged aggregate method > > >> > >>> .aggregate(grouped2, aggregator2) // this is a new > method > > >> > >>> .aggregate(grouped3, aggregator3); // ditto > > >> > >>> > > >> > >>> This means instead of adding cogroup methods on KGroupStream > > >> interface, > > >> > >>> adding aggregate method on KTable interface. > > >> > >>> > > >> > >>> Is that feasible? > > >> > >>> > > >> > >>> Cheers, > > >> > >>> Michał > > >> > >>> > > >> > >>> On 13/06/17 10:56, Michal Borowiecki wrote: > > >> > >>> > > >> > >>> Also, I still feel that putting initializer on the first cogroup > > can > > >> > >>> mislead users into thinking the first stream is in some way > > special. > > >> > >>> Just my 5c. > > >> > >>> Michał > > >> > >>> > > >> > >>> On 13/06/17 09:54, Michal Borowiecki wrote: > > >> > >>> > > >> > >>> Agree completely with the argument for serdes belonging in the > > same > > >> > place > > >> > >>> as the state store name, which is in the aggregate method. > > >> > >>> > > >> > >>> Cheers, > > >> > >>> > > >> > >>> Michał > > >> > >>> > > >> > >>> On 12/06/17 18:20, Xavier Léauté wrote: > > >> > >>> > > >> > >>> I think we are discussing two separate things here, so it might > be > > >> > worth > > >> > >>> clarifying: > > >> > >>> > > >> > >>> 1) the position of the initializer with respect to the > > aggregators. > > >> If > > >> > I > > >> > >>> understand correctly, Guozhang seems to think it is more natural > > to > > >> > specify > > >> > >>> the initializer first, despite it not bearing any relation to > the > > >> first > > >> > >>> aggregator. I can see the argument for specifying the > initializer > > >> > first, > > >> > >>> but I think it is debatable whether mixing it into the first > > cogroup > > >> > call > > >> > >>> leads to a cleaner API or not. > > >> > >>> > > >> > >>> 2) where the serde should be defined (if necessary). Looking at > > our > > >> > >>> existing APIs in KGroupedStreams, we always offer two > aggregate() > > >> > >>> methods. The first one takes the name of the store and > associated > > >> > aggregate > > >> > >>> value serde e.g. KGroupedStream.aggregate(Initializer<VR> > > >> initializer, > > >> > >>> Aggregator<? super K, ? super V, VR> aggregator, Serde<VR> > > >> > aggValueSerde, > > >> > >>> String queryableStoreName) > > >> > >>> The second one only takes a state store supplier, and does not > > >> specify > > >> > any > > >> > >>> serde, e.g. KGroupedStream.aggregate(Initializer<VR> > > >> > >>> initializer, Aggregator<? super K, ? super V, VR> aggregator, > > final > > >> > >>> StateStoreSupplier<KeyValueStore> storeSupplier) > > >> > >>> Presumably, when specifying a state store supplier it shouldn't > be > > >> > >>> necessary to specify an aggregate value serde, since the > provided > > >> > >>> statestore might not need to serialize the values (e.g. it may > > just > > >> > keep > > >> > >>> them as regular objects in heap) or it may have its own > > >> > >>> internal serialization format. > > >> > >>> > > >> > >>> For consistency I think it would be valuable to preserve the > same > > >> two > > >> > >>> aggregate methods for cogroup as well. Since the serde is only > > >> > required in > > >> > >>> one of the two cases, I believe the serde has no place in the > > first > > >> > >>> cogroup() call and should only have to be specified as part of > the > > >> > >>> aggregate() method that takes a state store name. In the case > of a > > >> > state > > >> > >>> store supplier, no serde would be necessary. > > >> > >>> > > >> > >>> > > >> > >>> On Sat, Jun 10, 2017 at 4:09 PM Guozhang Wang < > wangg...@gmail.com > > > > > >> > wrote: > > >> > >>> > > >> > >>>> I'd agree that the aggregate value serde and the initializer > does > > >> not > > >> > >>>> bear direct relationship with the first `cogroup` calls, but > > after > > >> I > > >> > tried > > >> > >>>> to write some example code with these two different set of > APIs I > > >> > felt the > > >> > >>>> current APIs just program more naturally. > > >> > >>>> > > >> > >>>> I know it is kinda subjective, but I do think that user > > experience > > >> > may be > > >> > >>>> more important as a deciding factor than the logical argument > for > > >> > public > > >> > >>>> interfaces. So I'd recommend people to also try out writing > some > > >> > example > > >> > >>>> lines also and we can circle back and discuss which one feels > > more > > >> > natural > > >> > >>>> to write code. > > >> > >>>> > > >> > >>>> > > >> > >>>> Guozhang > > >> > >>>> > > >> > >>>> On Fri, Jun 9, 2017 at 1:59 AM, Michal Borowiecki < > > >> > >>>> michal.borowie...@openbet.com> wrote: > > >> > >>>> > > >> > >>>>> I feel it would make more sense to move the initializer and > > serde > > >> to > > >> > the > > >> > >>>>> final aggregate statement, since the serde only applies to the > > >> state > > >> > >>>>> store, > > >> > >>>>> and the initializer doesn't bear any relation to the first > group > > >> in > > >> > >>>>> particular. > > >> > >>>>> > > >> > >>>>> +1 for moving initializer and serde from cogroup() to the > > >> aggregate() > > >> > >>>>> for the reasons mentioned above. > > >> > >>>>> > > >> > >>>>> Cheers, > > >> > >>>>> > > >> > >>>>> Michał > > >> > >>>>> > > >> > >>>>> On 08/06/17 22:44, Guozhang Wang wrote: > > >> > >>>>> > > >> > >>>> Note that although the internal `AbstractStoreSupplier` does > > >> maintain > > >> > the > > >> > >>>>> key-value serdes, we do not enforce the interface of > > >> > `StateStoreSupplier` > > >> > >>>>> to always retain that information, and hence we cannot assume > > that > > >> > >>>>> StateStoreSuppliers always retain key / value serdes. > > >> > >>>>> > > >> > >>>>> On Thu, Jun 8, 2017 at 11:51 AM, Xavier Léauté < > > >> xav...@confluent.io> > > >> > <xav...@confluent.io> wrote: > > >> > >>>>> > > >> > >>>>> > > >> > >>>>> Another reason for the serde not to be in the first cogroup > > call, > > >> is > > >> > that > > >> > >>>>> the serde should not be required if you pass a > > StateStoreSupplier > > >> to > > >> > >>>>> aggregate() > > >> > >>>>> > > >> > >>>>> Regarding the aggregated type <T> I don't the why initializer > > >> should > > >> > be > > >> > >>>>> favored over aggregator to define the type. In my mind > > separating > > >> the > > >> > >>>>> initializer into the last aggregate call clearly indicates > that > > >> the > > >> > >>>>> initializer is independent of any of the aggregators or > streams > > >> and > > >> > that we > > >> > >>>>> don't wait for grouped1 events to initialize the co-group. > > >> > >>>>> > > >> > >>>>> On Thu, Jun 8, 2017 at 11:14 AM Guozhang Wang < > > wangg...@gmail.com > > > > > >> < > > >> > wangg...@gmail.com> wrote: > > >> > >>>>> > > >> > >>>>> > > >> > >>>>> On a second thought... This is the current proposal API > > >> > >>>>> > > >> > >>>>> > > >> > >>>>> ``` > > >> > >>>>> > > >> > >>>>> <T> CogroupedKStream<K, T> cogroup(final Initializer<T> > > >> initializer, > > >> > >>>>> > > >> > >>>>> final > > >> > >>>>> > > >> > >>>>> Aggregator<? super K, ? super V, T> aggregator, final Serde<T> > > >> > >>>>> aggValueSerde) > > >> > >>>>> > > >> > >>>>> ``` > > >> > >>>>> > > >> > >>>>> > > >> > >>>>> If we do not have the initializer in the first co-group it > might > > >> be > > >> > a bit > > >> > >>>>> awkward for users to specify the aggregator that returns a > typed > > >> <T> > > >> > >>>>> > > >> > >>>>> value? > > >> > >>>>> > > >> > >>>>> Maybe it is still better to put these two functions in the > same > > >> api? > > >> > >>>>> > > >> > >>>>> > > >> > >>>>> > > >> > >>>>> Guozhang > > >> > >>>>> > > >> > >>>>> On Thu, Jun 8, 2017 at 11:08 AM, Guozhang Wang < > > >> wangg...@gmail.com> > > >> > <wangg...@gmail.com> > > >> > >>>>> > > >> > >>>>> wrote: > > >> > >>>>> > > >> > >>>>> This suggestion lgtm. I would vote for the first alternative > > than > > >> > >>>>> > > >> > >>>>> adding > > >> > >>>>> > > >> > >>>>> it to the `KStreamBuilder` though. > > >> > >>>>> > > >> > >>>>> On Thu, Jun 8, 2017 at 10:58 AM, Xavier Léauté < > > >> xav...@confluent.io> > > >> > <xav...@confluent.io> > > >> > >>>>> wrote: > > >> > >>>>> > > >> > >>>>> > > >> > >>>>> I have a minor suggestion to make the API a little bit more > > >> > symmetric. > > >> > >>>>> I feel it would make more sense to move the initializer and > > serde > > >> to > > >> > >>>>> > > >> > >>>>> the > > >> > >>>>> > > >> > >>>>> final aggregate statement, since the serde only applies to the > > >> state > > >> > >>>>> store, > > >> > >>>>> and the initializer doesn't bear any relation to the first > group > > >> in > > >> > >>>>> particular. It would end up looking like this: > > >> > >>>>> > > >> > >>>>> KTable<K, CG> cogrouped = > > >> > >>>>> grouped1.cogroup(aggregator1) > > >> > >>>>> .cogroup(grouped2, aggregator2) > > >> > >>>>> .cogroup(grouped3, aggregator3) > > >> > >>>>> .aggregate(initializer1, aggValueSerde, > storeName1); > > >> > >>>>> > > >> > >>>>> Alternatively, we could move the the first cogroup() method to > > >> > >>>>> KStreamBuilder, similar to how we have .merge() > > >> > >>>>> and end up with an api that would be even more symmetric. > > >> > >>>>> > > >> > >>>>> KStreamBuilder.cogroup(grouped1, aggregator1) > > >> > >>>>> .cogroup(grouped2, aggregator2) > > >> > >>>>> .cogroup(grouped3, aggregator3) > > >> > >>>>> .aggregate(initializer1, aggValueSerde, > > storeName1); > > >> > >>>>> > > >> > >>>>> This doesn't have to be a blocker, but I thought it would make > > the > > >> > API > > >> > >>>>> just > > >> > >>>>> a tad cleaner. > > >> > >>>>> > > >> > >>>>> On Tue, Jun 6, 2017 at 3:59 PM Guozhang Wang < > > wangg...@gmail.com> > > >> < > > >> > wangg...@gmail.com> > > >> > >>>>> > > >> > >>>>> wrote: > > >> > >>>>> > > >> > >>>>> Kyle, > > >> > >>>>> > > >> > >>>>> Thanks a lot for the updated KIP. It looks good to me. > > >> > >>>>> > > >> > >>>>> > > >> > >>>>> Guozhang > > >> > >>>>> > > >> > >>>>> > > >> > >>>>> On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski < > j...@jagunet.com> > > < > > >> > j...@jagunet.com> > > >> > >>>>> > > >> > >>>>> wrote: > > >> > >>>>> > > >> > >>>>> This makes much more sense to me. +1 > > >> > >>>>> > > >> > >>>>> > > >> > >>>>> On Jun 1, 2017, at 10:33 AM, Kyle Winkelman < > > >> > >>>>> > > >> > >>>>> winkelman.k...@gmail.com> > > >> > >>>>> > > >> > >>>>> wrote: > > >> > >>>>> > > >> > >>>>> I have updated the KIP and my PR. Let me know what you think. > > >> > >>>>> To created a cogrouped stream just call cogroup on a > > >> > >>>>> > > >> > >>>>> KgroupedStream > > >> > >>>>> > > >> > >>>>> and > > >> > >>>>> > > >> > >>>>> supply the initializer, aggValueSerde, and an aggregator. Then > > >> > >>>>> > > >> > >>>>> continue > > >> > >>>>> > > >> > >>>>> adding kgroupedstreams and aggregators. Then call one of the > > >> > >>>>> > > >> > >>>>> many > > >> > >>>>> > > >> > >>>>> aggregate > > >> > >>>>> > > >> > >>>>> calls to create a KTable. > > >> > >>>>> > > >> > >>>>> Thanks, > > >> > >>>>> Kyle > > >> > >>>>> > > >> > >>>>> On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com> < > > >> > damian....@gmail.com> > > >> > >>>>> > > >> > >>>>> wrote: > > >> > >>>>> > > >> > >>>>> Hi Kyle, > > >> > >>>>> > > >> > >>>>> Thanks for the update. I think just one initializer makes > sense > > >> > >>>>> > > >> > >>>>> as > > >> > >>>>> > > >> > >>>>> it > > >> > >>>>> > > >> > >>>>> should only be called once per key and generally it is just > > >> > >>>>> > > >> > >>>>> going > > >> > >>>>> > > >> > >>>>> to > > >> > >>>>> > > >> > >>>>> create > > >> > >>>>> > > >> > >>>>> a new instance of whatever the Aggregate class is. > > >> > >>>>> > > >> > >>>>> Cheers, > > >> > >>>>> Damian > > >> > >>>>> > > >> > >>>>> On Wed, 31 May 2017 at 20:09 Kyle Winkelman < > > >> > >>>>> > > >> > >>>>> winkelman.k...@gmail.com > > >> > >>>>> > > >> > >>>>> wrote: > > >> > >>>>> > > >> > >>>>> > > >> > >>>>> Hello all, > > >> > >>>>> > > >> > >>>>> I have spent some more time on this and the best alternative I > > >> > >>>>> > > >> > >>>>> have > > >> > >>>>> > > >> > >>>>> come > > >> > >>>>> > > >> > >>>>> up > > >> > >>>>> > > >> > >>>>> with is: > > >> > >>>>> KGroupedStream has a single cogroup call that takes an > > >> > >>>>> > > >> > >>>>> initializer > > >> > >>>>> > > >> > >>>>> and > > >> > >>>>> > > >> > >>>>> an > > >> > >>>>> > > >> > >>>>> aggregator. > > >> > >>>>> CogroupedKStream has a cogroup call that takes additional > > >> > >>>>> > > >> > >>>>> groupedStream > > >> > >>>>> > > >> > >>>>> aggregator pairs. > > >> > >>>>> CogroupedKStream has multiple aggregate methods that create > > >> > >>>>> > > >> > >>>>> the > > >> > >>>>> > > >> > >>>>> different > > >> > >>>>> > > >> > >>>>> stores. > > >> > >>>>> > > >> > >>>>> I plan on updating the kip but I want people's input on if we > > >> > >>>>> > > >> > >>>>> should > > >> > >>>>> > > >> > >>>>> have > > >> > >>>>> > > >> > >>>>> the initializer be passed in once at the beginning or if we > > >> > >>>>> > > >> > >>>>> should > > >> > >>>>> > > >> > >>>>> instead > > >> > >>>>> > > >> > >>>>> have the initializer be required for each call to one of the > > >> > >>>>> > > >> > >>>>> aggregate > > >> > >>>>> > > >> > >>>>> calls. The first makes more sense to me but doesnt allow the > > >> > >>>>> > > >> > >>>>> user > > >> > >>>>> > > >> > >>>>> to > > >> > >>>>> > > >> > >>>>> specify different initializers for different tables. > > >> > >>>>> > > >> > >>>>> Thanks, > > >> > >>>>> Kyle > > >> > >>>>> > > >> > >>>>> On May 24, 2017 7:46 PM, "Kyle Winkelman" < > > >> > >>>>> > > >> > >>>>> winkelman.k...@gmail.com> > > >> > >>>>> > > >> > >>>>> wrote: > > >> > >>>>> > > >> > >>>>> > > >> > >>>>> Yea I really like that idea I'll see what I can do to update > > >> > >>>>> > > >> > >>>>> the > > >> > >>>>> > > >> > >>>>> kip > > >> > >>>>> > > >> > >>>>> and > > >> > >>>>> > > >> > >>>>> my pr when I have some time. I'm not sure how well creating > > >> > >>>>> > > >> > >>>>> the > > >> > >>>>> > > >> > >>>>> kstreamaggregates will go though because at that point I will > > >> > >>>>> > > >> > >>>>> have > > >> > >>>>> > > >> > >>>>> thrown > > >> > >>>>> > > >> > >>>>> away the type of the values. It will be type safe I just may > > >> > >>>>> > > >> > >>>>> need to > > >> > >>>>> > > >> > >>>>> do a > > >> > >>>>> > > >> > >>>>> little forcing. > > >> > >>>>> > > >> > >>>>> Thanks, > > >> > >>>>> Kyle > > >> > >>>>> > > >> > >>>>> On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com > > >> > >>>>> > > >> > >>>>> wrote: > > >> > >>>>> > > >> > >>>>> Kyle, > > >> > >>>>> > > >> > >>>>> Thanks for the explanations, my previous read on the wiki > > >> > >>>>> > > >> > >>>>> examples > > >> > >>>>> > > >> > >>>>> was > > >> > >>>>> > > >> > >>>>> wrong. > > >> > >>>>> > > >> > >>>>> So I guess my motivation should be "reduced" to: can we move > > >> > >>>>> > > >> > >>>>> the > > >> > >>>>> > > >> > >>>>> window > > >> > >>>>> > > >> > >>>>> specs param from "KGroupedStream#cogroup(..)" to > > >> > >>>>> "CogroupedKStream#aggregate(..)", and my motivations are: > > >> > >>>>> > > >> > >>>>> 1. minor: we can reduce the #.generics in CogroupedKStream > > >> > >>>>> > > >> > >>>>> from > > >> > >>>>> > > >> > >>>>> 3 > > >> > >>>>> > > >> > >>>>> to > > >> > >>>>> > > >> > >>>>> 2. > > >> > >>>>> > > >> > >>>>> 2. major: this is for extensibility of the APIs, and since > > >> > >>>>> > > >> > >>>>> we > > >> > >>>>> > > >> > >>>>> are > > >> > >>>>> > > >> > >>>>> removing > > >> > >>>>> > > >> > >>>>> the "Evolving" annotations on Streams it may be harder to > > >> > >>>>> > > >> > >>>>> change it > > >> > >>>>> > > >> > >>>>> again > > >> > >>>>> > > >> > >>>>> in the future. The extended use cases are that people wanted > > >> > >>>>> > > >> > >>>>> to > > >> > >>>>> > > >> > >>>>> have > > >> > >>>>> > > >> > >>>>> windowed running aggregates on different granularities, e.g. > > >> > >>>>> > > >> > >>>>> "give > > >> > >>>>> > > >> > >>>>> me > > >> > >>>>> > > >> > >>>>> the > > >> > >>>>> > > >> > >>>>> counts per-minute, per-hour, per-day and per-week", and > > >> > >>>>> > > >> > >>>>> today > > >> > >>>>> > > >> > >>>>> in > > >> > >>>>> > > >> > >>>>> DSL > > >> > >>>>> > > >> > >>>>> we > > >> > >>>>> > > >> > >>>>> need to specify that case in multiple aggregate operators, > > >> > >>>>> > > >> > >>>>> which > > >> > >>>>> > > >> > >>>>> gets > > >> > >>>>> > > >> > >>>>> a > > >> > >>>>> > > >> > >>>>> state store / changelog, etc. And it is possible to optimize > > >> > >>>>> > > >> > >>>>> it > > >> > >>>>> > > >> > >>>>> as > > >> > >>>>> > > >> > >>>>> well > > >> > >>>>> > > >> > >>>>> to > > >> > >>>>> > > >> > >>>>> a single state store. Its implementation would be tricky as > > >> > >>>>> > > >> > >>>>> you > > >> > >>>>> > > >> > >>>>> need > > >> > >>>>> > > >> > >>>>> to > > >> > >>>>> > > >> > >>>>> contain different lengthed windows within your window store > > >> > >>>>> > > >> > >>>>> but > > >> > >>>>> > > >> > >>>>> just > > >> > >>>>> > > >> > >>>>> from > > >> > >>>>> > > >> > >>>>> the public API point of view, it could be specified as: > > >> > >>>>> > > >> > >>>>> CogroupedKStream stream = stream1.cogroup(stream2, ... > > >> > >>>>> "state-store-name"); > > >> > >>>>> > > >> > >>>>> table1 = stream.aggregate(/*per-minute window*/) > > >> > >>>>> table2 = stream.aggregate(/*per-hour window*/) > > >> > >>>>> table3 = stream.aggregate(/*per-day window*/) > > >> > >>>>> > > >> > >>>>> while underlying we are only using a single store > > >> > >>>>> > > >> > >>>>> "state-store-name" > > >> > >>>>> > > >> > >>>>> for > > >> > >>>>> > > >> > >>>>> it. > > >> > >>>>> > > >> > >>>>> > > >> > >>>>> Although this feature is out of the scope of this KIP, I'd > > >> > >>>>> > > >> > >>>>> like > > >> > >>>>> > > >> > >>>>> to > > >> > >>>>> > > >> > >>>>> discuss > > >> > >>>>> > > >> > >>>>> if we can "leave the door open" to make such changes without > > >> > >>>>> > > >> > >>>>> modifying > > >> > >>>>> > > >> > >>>>> the > > >> > >>>>> > > >> > >>>>> public APIs . > > >> > >>>>> > > >> > >>>>> Guozhang > > >> > >>>>> > > >> > >>>>> > > >> > >>>>> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman < > > >> > >>>>> > > >> > >>>>> winkelman.k...@gmail.com > > >> > >>>>> > > >> > >>>>> wrote: > > >> > >>>>> > > >> > >>>>> > > >> > >>>>> I allow defining a single window/sessionwindow one time > > >> > >>>>> > > >> > >>>>> when > > >> > >>>>> > > >> > >>>>> you > > >> > >>>>> > > >> > >>>>> make > > >> > >>>>> > > >> > >>>>> the > > >> > >>>>> > > >> > >>>>> cogroup call from a KGroupedStream. From then on you are > > >> > >>>>> > > >> > >>>>> using > > >> > >>>>> > > >> > >>>>> the > > >> > >>>>> > > >> > >>>>> cogroup > > >> > >>>>> > > >> > >>>>> call from with in CogroupedKStream which doesnt accept any > > >> > >>>>> > > >> > >>>>> additional > > >> > >>>>> > > >> > >>>>> windows/sessionwindows. > > >> > >>>>> > > >> > >>>>> Is this what you meant by your question or did I > > >> > >>>>> > > >> > >>>>> misunderstand? > > >> > >>>>> > > >> > >>>>> On May 23, 2017 9:33 PM, "Guozhang Wang" < > > >> > >>>>> > > >> > >>>>> wangg...@gmail.com > > >> > >>>>> > > >> > >>>>> wrote: > > >> > >>>>> > > >> > >>>>> Another question that came to me is on "window alignment": > > >> > >>>>> > > >> > >>>>> from > > >> > >>>>> > > >> > >>>>> the > > >> > >>>>> > > >> > >>>>> KIP > > >> > >>>>> > > >> > >>>>> it > > >> > >>>>> > > >> > >>>>> seems you are allowing users to specify a (potentially > > >> > >>>>> > > >> > >>>>> different) > > >> > >>>>> > > >> > >>>>> window > > >> > >>>>> > > >> > >>>>> spec in each co-grouped input stream. So if these window > > >> > >>>>> > > >> > >>>>> specs > > >> > >>>>> > > >> > >>>>> are > > >> > >>>>> > > >> > >>>>> different how should we "align" them with different input > > >> > >>>>> > > >> > >>>>> streams? I > > >> > >>>>> > > >> > >>>>> think > > >> > >>>>> > > >> > >>>>> it is more natural to only specify on window spec in the > > >> > >>>>> > > >> > >>>>> KTable<RK, V> CogroupedKStream#aggregate(Windows); > > >> > >>>>> > > >> > >>>>> > > >> > >>>>> And remove it from the cogroup() functions. WDYT? > > >> > >>>>> > > >> > >>>>> > > >> > >>>>> Guozhang > > >> > >>>>> > > >> > >>>>> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang < > > >> > >>>>> > > >> > >>>>> wangg...@gmail.com> > > >> > >>>>> > > >> > >>>>> wrote: > > >> > >>>>> > > >> > >>>>> Thanks for the proposal Kyle, this is a quite common use > > >> > >>>>> > > >> > >>>>> case > > >> > >>>>> > > >> > >>>>> to > > >> > >>>>> > > >> > >>>>> support > > >> > >>>>> > > >> > >>>>> such multi-way table join (i.e. N source tables with N > > >> > >>>>> > > >> > >>>>> aggregate > > >> > >>>>> > > >> > >>>>> func) > > >> > >>>>> > > >> > >>>>> with > > >> > >>>>> > > >> > >>>>> a single store and N+1 serdes, I have seen lots of people > > >> > >>>>> > > >> > >>>>> using > > >> > >>>>> > > >> > >>>>> the > > >> > >>>>> > > >> > >>>>> low-level PAPI to achieve this goal. > > >> > >>>>> > > >> > >>>>> > > >> > >>>>> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman < > > >> > >>>>> > > >> > >>>>> winkelman.k...@gmail.com > > >> > >>>>> > > >> > >>>>> wrote: > > >> > >>>>> > > >> > >>>>> I like your point about not handling other cases such as > > >> > >>>>> > > >> > >>>>> count > > >> > >>>>> > > >> > >>>>> and > > >> > >>>>> > > >> > >>>>> reduce. > > >> > >>>>> > > >> > >>>>> I think that reduce may not make sense because reduce > > >> > >>>>> > > >> > >>>>> assumes > > >> > >>>>> > > >> > >>>>> that > > >> > >>>>> > > >> > >>>>> the > > >> > >>>>> > > >> > >>>>> input values are the same as the output values. With > > >> > >>>>> > > >> > >>>>> cogroup > > >> > >>>>> > > >> > >>>>> ... > > >> > > > > >> > > -- > > >> > > Signature > > >> > > <http://www.openbet.com/> Michal Borowiecki > > >> > > Senior Software Engineer L4 > > >> > > T: +44 208 742 1600 <+44%2020%208742%201600> > > >> > > > > >> > > > > >> > > +44 203 249 8448 <+44%2020%203249%208448> > > >> > > > > >> > > > > >> > > > > >> > > E: michal.borowie...@openbet.com > > >> > > W: www.openbet.com <http://www.openbet.com/> > > >> > > > > >> > > > > >> > > OpenBet Ltd > > >> > > > > >> > > Chiswick Park Building 9 > > >> > > > > >> > > 566 Chiswick High Rd > > >> > > > > >> > > London > > >> > > > > >> > > W4 5XT > > >> > > > > >> > > UK > > >> > > > > >> > > > > >> > > <https://www.openbet.com/email_promo> > > >> > > > > >> > > This message is confidential and intended only for the addressee. > If > > >> you > > >> > > have received this message in error, please immediately notify the > > >> > > postmas...@openbet.com <mailto:postmas...@openbet.com> and delete > > it > > >> > > from your system as well as any copies. The content of e-mails as > > well > > >> > > as traffic data may be monitored by OpenBet for employment and > > >> security > > >> > > purposes. To protect the environment please do not print this > e-mail > > >> > > unless necessary. OpenBet Ltd. Registered Office: Chiswick Park > > >> Building > > >> > > 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A > company > > >> > > registered in England and Wales. Registered no. 3134634. VAT no. > > >> > > GB927523612 > > >> > > > > >> > > > >> > > > >> > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > > > -- > > -- Guozhang > > >