+1 from me

any stream should be able to initialize the cogroup

On Wed, Jun 14, 2017 at 3:44 PM Kyle Winkelman <winkelman.k...@gmail.com>
wrote:

> I will update the kip to have only the aggregator in the first cogroup call
> and the initializer and serde in the aggregate calls.
>
> This seems to be the consensus on the email chain.
>
> Thanks,
> Kyle
>
> On Jun 14, 2017 5:41 PM, wrote:
>
> That is not the case. No matter which stream the record comes in on the
> initializer is called if there is not currently an object in the store.
>
> On Jun 14, 2017 5:12 PM, "Guozhang Wang" <wangg...@gmail.com> wrote:
>
> While regarding where we should ask users to set serdes: I think I'm
> convinced by Xavier that they should be in the `aggregate` call (but only
> those does not pass in a state store supplier) instead of the
> `KStream#cogroup` call to be consistent with other aggregate functions.
>
> BTW another motivation for me to suggest keeping the initializer on the
> first stream is that by reviewing the PR (some time ago though, so again I
> might be wrong) we will trigger the initializer only when we received an
> incoming record from the first stream whose key is not in the state store
> yet, while for other streams we will just drop it on the floor. If that is
> actually not the case, that we call initializer on any one of the
> co-grouped streams' incoming records, then I'm open to set the initializer
> at the `aggregate` call as well.
>
>
> Guozhang
>
> On Wed, Jun 14, 2017 at 2:23 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > I'd suggest we do not block this KIP until the serde work has been sorted
> > out: we cannot estimate yet how long it will take yet. Instead let's say
> > make an agreement on where we want to specify the serdes: whether on the
> > first co-group call or on the aggregate call.
> >
> > Also about the initializer specification I actually felt that the first
> > cogrouped stream is special (Kyle please feel free to correct me if I'm
> > wrong) and that is why I thought it is better to specify the initializer
> at
> > the beginning: since from the typing you can see that the final
> aggregated
> > value type is defined to be the same as the first co-grouped stream, and
> > for any intermediate stream to co-group, their value types not be
> inherited
> > but the value be "incorporated" into the original stream:
> >
> >  <T> CogroupedKStream<K, V> cogroup(final KGroupedStream<K, T>
> > groupedStream, final Aggregator<? super K, ? super T, V> aggregator)
> >
> > Note that we do not have a cogroup function that returns
> > CogroupedKStream<K, T>.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Jun 13, 2017 at 2:31 PM, Bill Bejeck <bbej...@gmail.com> wrote:
> >
> >> +1 on deferring discussion on Serdes until API improvements are ironed
> >> out.
> >>
> >> On Tue, Jun 13, 2017 at 2:06 PM, Matthias J. Sax <matth...@confluent.io
> >
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> > I am just catching up on this thread. (1) as most people agree, we
> >> > should not add anything to KStreamBuilder (btw: we actually plan to
> move
> >> > #merge() to KStream and deprecate it on KStreamBuilder as it's a quite
> >> > unnatural API atm).
> >> >
> >> > About specifying Serdes: there is still the idea to improve to overall
> >> > API from the current "we are adding more overloads"-pattern to a
> >> > builder-like pattern. This might make the whole discussion void if we
> do
> >> > this. Thus, it might make sense to keep this in mind (or even delay
> this
> >> > KIP?). It seems a waste of time to discuss all this if we are going to
> >> > chance it in 2 month anyway... Just saying.
> >> >
> >> >
> >> > -Matthias
> >> >
> >> > On 6/13/17 8:05 AM, Michal Borowiecki wrote:
> >> > > You're right, I haven't thought of that.
> >> > >
> >> > > Cheers,
> >> > >
> >> > > Michał
> >> > >
> >> > >
> >> > > On 13/06/17 13:00, Kyle Winkelman wrote:
> >> > >> First, I would prefer not calling it aggregate because there are
> >> already
> >> > >> plenty of aggregate methods.
> >> > >>
> >> > >> Second, I dont think this would really work because after each
> >> aggregate
> >> > >> you now have a unique KTable (someone may want a table with 4
> streams
> >> > and
> >> > >> reuse those 4 in another table but with one more stream added) and
> >> > unless
> >> > >> we completely duplicate everything every time this isnt really
> >> possible.
> >> > >> Additionally, the cogroup way just requires 1 more call to create
> two
> >> > >> different tables (normal, windowed, and session windowed) this new
> >> way
> >> > >> would require copying the aggregate chain.
> >> > >>
> >> > >> Another way to think about it is with cogroup we know that when
> they
> >> > call
> >> > >> aggregate they arent going to be adding any more aggregators to
> that
> >> > table
> >> > >> but your way requires us to assume they are done adding aggregators
> >> > after
> >> > >> each call so we must return a ktable just to possibly not need to
> >> have
> >> > >> created it.
> >> > >>
> >> > >> On Jun 13, 2017 5:20 AM, "Michal Borowiecki" <
> >> > michal.borowie...@openbet.com>
> >> > >> wrote:
> >> > >>
> >> > >>> Actually, just had a thought. It started with naming.
> >> > >>>
> >> > >>> Are we actually co-grouping these streams or are we co-aggregating
> >> > them?
> >> > >>>
> >> > >>> After all, in each of the cogroup calls we are providing an
> >> Aggregator
> >> > >>> implementation.
> >> > >>>
> >> > >>>
> >> > >>> If they are really co-aggregated, why don't we turn this around:
> >> > >>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1").
> >> > groupByKey();
> >> > >>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2").
> >> > groupByKey();
> >> > >>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3").
> >> > groupByKey();
> >> > >>>
> >> > >>> KTable<K, CG> coagg = grouped1.aggregate(initializer1,
> aggregator1,
> >> > >>> aggValueSerde1) // this is the unchanged aggregate method
> >> > >>>         .aggregate(grouped2, aggregator2)  // this is a new method
> >> > >>>         .aggregate(grouped3, aggregator3); // ditto
> >> > >>>
> >> > >>> This means instead of adding cogroup methods on KGroupStream
> >> interface,
> >> > >>> adding aggregate method on KTable interface.
> >> > >>>
> >> > >>> Is that feasible?
> >> > >>>
> >> > >>> Cheers,
> >> > >>> Michał
> >> > >>>
> >> > >>> On 13/06/17 10:56, Michal Borowiecki wrote:
> >> > >>>
> >> > >>> Also, I still feel that putting initializer on the first cogroup
> can
> >> > >>> mislead users into thinking the first stream is in some way
> special.
> >> > >>> Just my 5c.
> >> > >>> Michał
> >> > >>>
> >> > >>> On 13/06/17 09:54, Michal Borowiecki wrote:
> >> > >>>
> >> > >>> Agree completely with the argument for serdes belonging in the
> same
> >> > place
> >> > >>> as the state store name, which is in the aggregate method.
> >> > >>>
> >> > >>> Cheers,
> >> > >>>
> >> > >>> Michał
> >> > >>>
> >> > >>> On 12/06/17 18:20, Xavier Léauté wrote:
> >> > >>>
> >> > >>> I think we are discussing two separate things here, so it might be
> >> > worth
> >> > >>> clarifying:
> >> > >>>
> >> > >>> 1) the position of the initializer with respect to the
> aggregators.
> >> If
> >> > I
> >> > >>> understand correctly, Guozhang seems to think it is more natural
> to
> >> > specify
> >> > >>> the initializer first, despite it not bearing any relation to the
> >> first
> >> > >>> aggregator. I can see the argument for specifying the initializer
> >> > first,
> >> > >>> but I think it is debatable whether mixing it into the first
> cogroup
> >> > call
> >> > >>> leads to a cleaner API or not.
> >> > >>>
> >> > >>> 2) where the serde should be defined (if necessary). Looking at
> our
> >> > >>> existing APIs in KGroupedStreams, we always offer two aggregate()
> >> > >>> methods. The first one takes the name of the store and associated
> >> > aggregate
> >> > >>> value serde e.g. KGroupedStream.aggregate(Initializer<VR>
> >> initializer,
> >> > >>> Aggregator<? super K, ? super V, VR> aggregator, Serde<VR>
> >> > aggValueSerde,
> >> > >>> String queryableStoreName)
> >> > >>> The second one only takes a state store supplier, and does not
> >> specify
> >> > any
> >> > >>> serde, e.g. KGroupedStream.aggregate(Initializer<VR>
> >> > >>> initializer, Aggregator<? super K, ? super V, VR> aggregator,
> final
> >> > >>> StateStoreSupplier<KeyValueStore> storeSupplier)
> >> > >>> Presumably, when specifying a state store supplier it shouldn't be
> >> > >>> necessary to specify an aggregate value serde, since the provided
> >> > >>> statestore might not need to serialize the values (e.g. it may
> just
> >> > keep
> >> > >>> them as regular objects in heap) or it may have its own
> >> > >>> internal serialization format.
> >> > >>>
> >> > >>> For consistency I think it would be valuable to preserve the same
> >> two
> >> > >>> aggregate methods for cogroup as well. Since the serde is only
> >> > required in
> >> > >>> one of the two cases, I believe the serde has no place in the
> first
> >> > >>> cogroup() call and should only have to be specified as part of the
> >> > >>> aggregate() method that takes a state store name. In the case of a
> >> > state
> >> > >>> store supplier, no serde would be necessary.
> >> > >>>
> >> > >>>
> >> > >>> On Sat, Jun 10, 2017 at 4:09 PM Guozhang Wang <wangg...@gmail.com
> >
> >> > wrote:
> >> > >>>
> >> > >>>> I'd agree that the aggregate value serde and the initializer does
> >> not
> >> > >>>> bear direct relationship with the first `cogroup` calls, but
> after
> >> I
> >> > tried
> >> > >>>> to write some example code with these two different set of APIs I
> >> > felt the
> >> > >>>> current APIs just program more naturally.
> >> > >>>>
> >> > >>>> I know it is kinda subjective, but I do think that user
> experience
> >> > may be
> >> > >>>> more important as a deciding factor than the logical argument for
> >> > public
> >> > >>>> interfaces. So I'd recommend people to also try out writing some
> >> > example
> >> > >>>> lines also and we can circle back and discuss which one feels
> more
> >> > natural
> >> > >>>> to write code.
> >> > >>>>
> >> > >>>>
> >> > >>>> Guozhang
> >> > >>>>
> >> > >>>> On Fri, Jun 9, 2017 at 1:59 AM, Michal Borowiecki <
> >> > >>>> michal.borowie...@openbet.com> wrote:
> >> > >>>>
> >> > >>>>> I feel it would make more sense to move the initializer and
> serde
> >> to
> >> > the
> >> > >>>>> final aggregate statement, since the serde only applies to the
> >> state
> >> > >>>>> store,
> >> > >>>>> and the initializer doesn't bear any relation to the first group
> >> in
> >> > >>>>> particular.
> >> > >>>>>
> >> > >>>>> +1 for moving initializer and serde from cogroup() to the
> >> aggregate()
> >> > >>>>> for the reasons mentioned above.
> >> > >>>>>
> >> > >>>>> Cheers,
> >> > >>>>>
> >> > >>>>> Michał
> >> > >>>>>
> >> > >>>>> On 08/06/17 22:44, Guozhang Wang wrote:
> >> > >>>>>
> >> > >>>> Note that although the internal `AbstractStoreSupplier` does
> >> maintain
> >> > the
> >> > >>>>> key-value serdes, we do not enforce the interface of
> >> > `StateStoreSupplier`
> >> > >>>>> to always retain that information, and hence we cannot assume
> that
> >> > >>>>> StateStoreSuppliers always retain key / value serdes.
> >> > >>>>>
> >> > >>>>> On Thu, Jun 8, 2017 at 11:51 AM, Xavier Léauté <
> >> xav...@confluent.io>
> >> > <xav...@confluent.io> wrote:
> >> > >>>>>
> >> > >>>>>
> >> > >>>>> Another reason for the serde not to be in the first cogroup
> call,
> >> is
> >> > that
> >> > >>>>> the serde should not be required if you pass a
> StateStoreSupplier
> >> to
> >> > >>>>> aggregate()
> >> > >>>>>
> >> > >>>>> Regarding the aggregated type <T> I don't the why initializer
> >> should
> >> > be
> >> > >>>>> favored over aggregator to define the type. In my mind
> separating
> >> the
> >> > >>>>> initializer into the last aggregate call clearly indicates that
> >> the
> >> > >>>>> initializer is independent of any of the aggregators or streams
> >> and
> >> > that we
> >> > >>>>> don't wait for grouped1 events to initialize the co-group.
> >> > >>>>>
> >> > >>>>> On Thu, Jun 8, 2017 at 11:14 AM Guozhang Wang <
> wangg...@gmail.com
> >
> >> <
> >> > wangg...@gmail.com> wrote:
> >> > >>>>>
> >> > >>>>>
> >> > >>>>> On a second thought... This is the current proposal API
> >> > >>>>>
> >> > >>>>>
> >> > >>>>> ```
> >> > >>>>>
> >> > >>>>> <T> CogroupedKStream<K, T> cogroup(final Initializer<T>
> >> initializer,
> >> > >>>>>
> >> > >>>>> final
> >> > >>>>>
> >> > >>>>> Aggregator<? super K, ? super V, T> aggregator, final Serde<T>
> >> > >>>>> aggValueSerde)
> >> > >>>>>
> >> > >>>>> ```
> >> > >>>>>
> >> > >>>>>
> >> > >>>>> If we do not have the initializer in the first co-group it might
> >> be
> >> > a bit
> >> > >>>>> awkward for users to specify the aggregator that returns a typed
> >> <T>
> >> > >>>>>
> >> > >>>>> value?
> >> > >>>>>
> >> > >>>>> Maybe it is still better to put these two functions in the same
> >> api?
> >> > >>>>>
> >> > >>>>>
> >> > >>>>>
> >> > >>>>> Guozhang
> >> > >>>>>
> >> > >>>>> On Thu, Jun 8, 2017 at 11:08 AM, Guozhang Wang <
> >> wangg...@gmail.com>
> >> > <wangg...@gmail.com>
> >> > >>>>>
> >> > >>>>> wrote:
> >> > >>>>>
> >> > >>>>> This suggestion lgtm. I would vote for the first alternative
> than
> >> > >>>>>
> >> > >>>>> adding
> >> > >>>>>
> >> > >>>>> it to the `KStreamBuilder` though.
> >> > >>>>>
> >> > >>>>> On Thu, Jun 8, 2017 at 10:58 AM, Xavier Léauté <
> >> xav...@confluent.io>
> >> > <xav...@confluent.io>
> >> > >>>>> wrote:
> >> > >>>>>
> >> > >>>>>
> >> > >>>>> I have a minor suggestion to make the API a little bit more
> >> > symmetric.
> >> > >>>>> I feel it would make more sense to move the initializer and
> serde
> >> to
> >> > >>>>>
> >> > >>>>> the
> >> > >>>>>
> >> > >>>>> final aggregate statement, since the serde only applies to the
> >> state
> >> > >>>>> store,
> >> > >>>>> and the initializer doesn't bear any relation to the first group
> >> in
> >> > >>>>> particular. It would end up looking like this:
> >> > >>>>>
> >> > >>>>> KTable<K, CG> cogrouped =
> >> > >>>>>     grouped1.cogroup(aggregator1)
> >> > >>>>>             .cogroup(grouped2, aggregator2)
> >> > >>>>>             .cogroup(grouped3, aggregator3)
> >> > >>>>>             .aggregate(initializer1, aggValueSerde, storeName1);
> >> > >>>>>
> >> > >>>>> Alternatively, we could move the the first cogroup() method to
> >> > >>>>> KStreamBuilder, similar to how we have .merge()
> >> > >>>>> and end up with an api that would be even more symmetric.
> >> > >>>>>
> >> > >>>>> KStreamBuilder.cogroup(grouped1, aggregator1)
> >> > >>>>>               .cogroup(grouped2, aggregator2)
> >> > >>>>>               .cogroup(grouped3, aggregator3)
> >> > >>>>>               .aggregate(initializer1, aggValueSerde,
> storeName1);
> >> > >>>>>
> >> > >>>>> This doesn't have to be a blocker, but I thought it would make
> the
> >> > API
> >> > >>>>> just
> >> > >>>>> a tad cleaner.
> >> > >>>>>
> >> > >>>>> On Tue, Jun 6, 2017 at 3:59 PM Guozhang Wang <
> wangg...@gmail.com>
> >> <
> >> > wangg...@gmail.com>
> >> > >>>>>
> >> > >>>>> wrote:
> >> > >>>>>
> >> > >>>>> Kyle,
> >> > >>>>>
> >> > >>>>> Thanks a lot for the updated KIP. It looks good to me.
> >> > >>>>>
> >> > >>>>>
> >> > >>>>> Guozhang
> >> > >>>>>
> >> > >>>>>
> >> > >>>>> On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski <j...@jagunet.com>
> <
> >> > j...@jagunet.com>
> >> > >>>>>
> >> > >>>>> wrote:
> >> > >>>>>
> >> > >>>>> This makes much more sense to me. +1
> >> > >>>>>
> >> > >>>>>
> >> > >>>>> On Jun 1, 2017, at 10:33 AM, Kyle Winkelman <
> >> > >>>>>
> >> > >>>>> winkelman.k...@gmail.com>
> >> > >>>>>
> >> > >>>>> wrote:
> >> > >>>>>
> >> > >>>>> I have updated the KIP and my PR. Let me know what you think.
> >> > >>>>> To created a cogrouped stream just call cogroup on a
> >> > >>>>>
> >> > >>>>> KgroupedStream
> >> > >>>>>
> >> > >>>>> and
> >> > >>>>>
> >> > >>>>> supply the initializer, aggValueSerde, and an aggregator. Then
> >> > >>>>>
> >> > >>>>> continue
> >> > >>>>>
> >> > >>>>> adding kgroupedstreams and aggregators. Then call one of the
> >> > >>>>>
> >> > >>>>> many
> >> > >>>>>
> >> > >>>>> aggregate
> >> > >>>>>
> >> > >>>>> calls to create a KTable.
> >> > >>>>>
> >> > >>>>> Thanks,
> >> > >>>>> Kyle
> >> > >>>>>
> >> > >>>>> On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com> <
> >> > damian....@gmail.com>
> >> > >>>>>
> >> > >>>>> wrote:
> >> > >>>>>
> >> > >>>>> Hi Kyle,
> >> > >>>>>
> >> > >>>>> Thanks for the update. I think just one initializer makes sense
> >> > >>>>>
> >> > >>>>> as
> >> > >>>>>
> >> > >>>>> it
> >> > >>>>>
> >> > >>>>> should only be called once per key and generally it is just
> >> > >>>>>
> >> > >>>>> going
> >> > >>>>>
> >> > >>>>> to
> >> > >>>>>
> >> > >>>>> create
> >> > >>>>>
> >> > >>>>> a new instance of whatever the Aggregate class is.
> >> > >>>>>
> >> > >>>>> Cheers,
> >> > >>>>> Damian
> >> > >>>>>
> >> > >>>>> On Wed, 31 May 2017 at 20:09 Kyle Winkelman <
> >> > >>>>>
> >> > >>>>> winkelman.k...@gmail.com
> >> > >>>>>
> >> > >>>>> wrote:
> >> > >>>>>
> >> > >>>>>
> >> > >>>>> Hello all,
> >> > >>>>>
> >> > >>>>> I have spent some more time on this and the best alternative I
> >> > >>>>>
> >> > >>>>> have
> >> > >>>>>
> >> > >>>>> come
> >> > >>>>>
> >> > >>>>> up
> >> > >>>>>
> >> > >>>>> with is:
> >> > >>>>> KGroupedStream has a single cogroup call that takes an
> >> > >>>>>
> >> > >>>>> initializer
> >> > >>>>>
> >> > >>>>> and
> >> > >>>>>
> >> > >>>>> an
> >> > >>>>>
> >> > >>>>> aggregator.
> >> > >>>>> CogroupedKStream has a cogroup call that takes additional
> >> > >>>>>
> >> > >>>>> groupedStream
> >> > >>>>>
> >> > >>>>> aggregator pairs.
> >> > >>>>> CogroupedKStream has multiple aggregate methods that create
> >> > >>>>>
> >> > >>>>> the
> >> > >>>>>
> >> > >>>>> different
> >> > >>>>>
> >> > >>>>> stores.
> >> > >>>>>
> >> > >>>>> I plan on updating the kip but I want people's input on if we
> >> > >>>>>
> >> > >>>>> should
> >> > >>>>>
> >> > >>>>> have
> >> > >>>>>
> >> > >>>>> the initializer be passed in once at the beginning or if we
> >> > >>>>>
> >> > >>>>> should
> >> > >>>>>
> >> > >>>>> instead
> >> > >>>>>
> >> > >>>>> have the initializer be required for each call to one of the
> >> > >>>>>
> >> > >>>>> aggregate
> >> > >>>>>
> >> > >>>>> calls. The first makes more sense to me but doesnt allow the
> >> > >>>>>
> >> > >>>>> user
> >> > >>>>>
> >> > >>>>> to
> >> > >>>>>
> >> > >>>>> specify different initializers for different tables.
> >> > >>>>>
> >> > >>>>> Thanks,
> >> > >>>>> Kyle
> >> > >>>>>
> >> > >>>>> On May 24, 2017 7:46 PM, "Kyle Winkelman" <
> >> > >>>>>
> >> > >>>>> winkelman.k...@gmail.com>
> >> > >>>>>
> >> > >>>>> wrote:
> >> > >>>>>
> >> > >>>>>
> >> > >>>>> Yea I really like that idea I'll see what I can do to update
> >> > >>>>>
> >> > >>>>> the
> >> > >>>>>
> >> > >>>>> kip
> >> > >>>>>
> >> > >>>>> and
> >> > >>>>>
> >> > >>>>> my pr when I have some time. I'm not sure how well creating
> >> > >>>>>
> >> > >>>>> the
> >> > >>>>>
> >> > >>>>> kstreamaggregates will go though because at that point I will
> >> > >>>>>
> >> > >>>>> have
> >> > >>>>>
> >> > >>>>> thrown
> >> > >>>>>
> >> > >>>>> away the type of the values. It will be type safe I just may
> >> > >>>>>
> >> > >>>>> need to
> >> > >>>>>
> >> > >>>>> do a
> >> > >>>>>
> >> > >>>>> little forcing.
> >> > >>>>>
> >> > >>>>> Thanks,
> >> > >>>>> Kyle
> >> > >>>>>
> >> > >>>>> On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com
> >> > >>>>>
> >> > >>>>> wrote:
> >> > >>>>>
> >> > >>>>> Kyle,
> >> > >>>>>
> >> > >>>>> Thanks for the explanations, my previous read on the wiki
> >> > >>>>>
> >> > >>>>> examples
> >> > >>>>>
> >> > >>>>> was
> >> > >>>>>
> >> > >>>>> wrong.
> >> > >>>>>
> >> > >>>>> So I guess my motivation should be "reduced" to: can we move
> >> > >>>>>
> >> > >>>>> the
> >> > >>>>>
> >> > >>>>> window
> >> > >>>>>
> >> > >>>>> specs param from "KGroupedStream#cogroup(..)" to
> >> > >>>>> "CogroupedKStream#aggregate(..)", and my motivations are:
> >> > >>>>>
> >> > >>>>> 1. minor: we can reduce the #.generics in CogroupedKStream
> >> > >>>>>
> >> > >>>>> from
> >> > >>>>>
> >> > >>>>> 3
> >> > >>>>>
> >> > >>>>> to
> >> > >>>>>
> >> > >>>>> 2.
> >> > >>>>>
> >> > >>>>> 2. major: this is for extensibility of the APIs, and since
> >> > >>>>>
> >> > >>>>> we
> >> > >>>>>
> >> > >>>>> are
> >> > >>>>>
> >> > >>>>> removing
> >> > >>>>>
> >> > >>>>> the "Evolving" annotations on Streams it may be harder to
> >> > >>>>>
> >> > >>>>> change it
> >> > >>>>>
> >> > >>>>> again
> >> > >>>>>
> >> > >>>>> in the future. The extended use cases are that people wanted
> >> > >>>>>
> >> > >>>>> to
> >> > >>>>>
> >> > >>>>> have
> >> > >>>>>
> >> > >>>>> windowed running aggregates on different granularities, e.g.
> >> > >>>>>
> >> > >>>>> "give
> >> > >>>>>
> >> > >>>>> me
> >> > >>>>>
> >> > >>>>> the
> >> > >>>>>
> >> > >>>>> counts per-minute, per-hour, per-day and per-week", and
> >> > >>>>>
> >> > >>>>> today
> >> > >>>>>
> >> > >>>>> in
> >> > >>>>>
> >> > >>>>> DSL
> >> > >>>>>
> >> > >>>>> we
> >> > >>>>>
> >> > >>>>> need to specify that case in multiple aggregate operators,
> >> > >>>>>
> >> > >>>>> which
> >> > >>>>>
> >> > >>>>> gets
> >> > >>>>>
> >> > >>>>> a
> >> > >>>>>
> >> > >>>>> state store / changelog, etc. And it is possible to optimize
> >> > >>>>>
> >> > >>>>> it
> >> > >>>>>
> >> > >>>>> as
> >> > >>>>>
> >> > >>>>> well
> >> > >>>>>
> >> > >>>>> to
> >> > >>>>>
> >> > >>>>> a single state store. Its implementation would be tricky as
> >> > >>>>>
> >> > >>>>> you
> >> > >>>>>
> >> > >>>>> need
> >> > >>>>>
> >> > >>>>> to
> >> > >>>>>
> >> > >>>>> contain different lengthed windows within your window store
> >> > >>>>>
> >> > >>>>> but
> >> > >>>>>
> >> > >>>>> just
> >> > >>>>>
> >> > >>>>> from
> >> > >>>>>
> >> > >>>>> the public API point of view, it could be specified as:
> >> > >>>>>
> >> > >>>>> CogroupedKStream stream = stream1.cogroup(stream2, ...
> >> > >>>>> "state-store-name");
> >> > >>>>>
> >> > >>>>> table1 = stream.aggregate(/*per-minute window*/)
> >> > >>>>> table2 = stream.aggregate(/*per-hour window*/)
> >> > >>>>> table3 = stream.aggregate(/*per-day window*/)
> >> > >>>>>
> >> > >>>>> while underlying we are only using a single store
> >> > >>>>>
> >> > >>>>> "state-store-name"
> >> > >>>>>
> >> > >>>>> for
> >> > >>>>>
> >> > >>>>> it.
> >> > >>>>>
> >> > >>>>>
> >> > >>>>> Although this feature is out of the scope of this KIP, I'd
> >> > >>>>>
> >> > >>>>> like
> >> > >>>>>
> >> > >>>>> to
> >> > >>>>>
> >> > >>>>> discuss
> >> > >>>>>
> >> > >>>>> if we can "leave the door open" to make such changes without
> >> > >>>>>
> >> > >>>>> modifying
> >> > >>>>>
> >> > >>>>> the
> >> > >>>>>
> >> > >>>>> public APIs .
> >> > >>>>>
> >> > >>>>> Guozhang
> >> > >>>>>
> >> > >>>>>
> >> > >>>>> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman <
> >> > >>>>>
> >> > >>>>> winkelman.k...@gmail.com
> >> > >>>>>
> >> > >>>>> wrote:
> >> > >>>>>
> >> > >>>>>
> >> > >>>>> I allow defining a single window/sessionwindow one time
> >> > >>>>>
> >> > >>>>> when
> >> > >>>>>
> >> > >>>>> you
> >> > >>>>>
> >> > >>>>> make
> >> > >>>>>
> >> > >>>>> the
> >> > >>>>>
> >> > >>>>> cogroup call from a KGroupedStream. From then on you are
> >> > >>>>>
> >> > >>>>> using
> >> > >>>>>
> >> > >>>>> the
> >> > >>>>>
> >> > >>>>> cogroup
> >> > >>>>>
> >> > >>>>> call from with in CogroupedKStream which doesnt accept any
> >> > >>>>>
> >> > >>>>> additional
> >> > >>>>>
> >> > >>>>> windows/sessionwindows.
> >> > >>>>>
> >> > >>>>> Is this what you meant by your question or did I
> >> > >>>>>
> >> > >>>>> misunderstand?
> >> > >>>>>
> >> > >>>>> On May 23, 2017 9:33 PM, "Guozhang Wang" <
> >> > >>>>>
> >> > >>>>> wangg...@gmail.com
> >> > >>>>>
> >> > >>>>> wrote:
> >> > >>>>>
> >> > >>>>> Another question that came to me is on "window alignment":
> >> > >>>>>
> >> > >>>>> from
> >> > >>>>>
> >> > >>>>> the
> >> > >>>>>
> >> > >>>>> KIP
> >> > >>>>>
> >> > >>>>> it
> >> > >>>>>
> >> > >>>>> seems you are allowing users to specify a (potentially
> >> > >>>>>
> >> > >>>>> different)
> >> > >>>>>
> >> > >>>>> window
> >> > >>>>>
> >> > >>>>> spec in each co-grouped input stream. So if these window
> >> > >>>>>
> >> > >>>>> specs
> >> > >>>>>
> >> > >>>>> are
> >> > >>>>>
> >> > >>>>> different how should we "align" them with different input
> >> > >>>>>
> >> > >>>>> streams? I
> >> > >>>>>
> >> > >>>>> think
> >> > >>>>>
> >> > >>>>> it is more natural to only specify on window spec in the
> >> > >>>>>
> >> > >>>>> KTable<RK, V> CogroupedKStream#aggregate(Windows);
> >> > >>>>>
> >> > >>>>>
> >> > >>>>> And remove it from the cogroup() functions. WDYT?
> >> > >>>>>
> >> > >>>>>
> >> > >>>>> Guozhang
> >> > >>>>>
> >> > >>>>> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang <
> >> > >>>>>
> >> > >>>>> wangg...@gmail.com>
> >> > >>>>>
> >> > >>>>> wrote:
> >> > >>>>>
> >> > >>>>> Thanks for the proposal Kyle, this is a quite common use
> >> > >>>>>
> >> > >>>>> case
> >> > >>>>>
> >> > >>>>> to
> >> > >>>>>
> >> > >>>>> support
> >> > >>>>>
> >> > >>>>> such multi-way table join (i.e. N source tables with N
> >> > >>>>>
> >> > >>>>> aggregate
> >> > >>>>>
> >> > >>>>> func)
> >> > >>>>>
> >> > >>>>> with
> >> > >>>>>
> >> > >>>>> a single store and N+1 serdes, I have seen lots of people
> >> > >>>>>
> >> > >>>>> using
> >> > >>>>>
> >> > >>>>> the
> >> > >>>>>
> >> > >>>>> low-level PAPI to achieve this goal.
> >> > >>>>>
> >> > >>>>>
> >> > >>>>> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman <
> >> > >>>>>
> >> > >>>>> winkelman.k...@gmail.com
> >> > >>>>>
> >> > >>>>> wrote:
> >> > >>>>>
> >> > >>>>> I like your point about not handling other cases such as
> >> > >>>>>
> >> > >>>>> count
> >> > >>>>>
> >> > >>>>> and
> >> > >>>>>
> >> > >>>>> reduce.
> >> > >>>>>
> >> > >>>>> I think that reduce may not make sense because reduce
> >> > >>>>>
> >> > >>>>> assumes
> >> > >>>>>
> >> > >>>>> that
> >> > >>>>>
> >> > >>>>> the
> >> > >>>>>
> >> > >>>>> input values are the same as the output values. With
> >> > >>>>>
> >> > >>>>> cogroup
> >> > >>>>>
> >> > >>>>> ...
> >> > >
> >> > > --
> >> > > Signature
> >> > > <http://www.openbet.com/>     Michal Borowiecki
> >> > > Senior Software Engineer L4
> >> > >       T:      +44 208 742 1600 <+44%2020%208742%201600>
> >> > >
> >> > >
> >> > >       +44 203 249 8448 <+44%2020%203249%208448>
> >> > >
> >> > >
> >> > >
> >> > >       E:      michal.borowie...@openbet.com
> >> > >       W:      www.openbet.com <http://www.openbet.com/>
> >> > >
> >> > >
> >> > >       OpenBet Ltd
> >> > >
> >> > >       Chiswick Park Building 9
> >> > >
> >> > >       566 Chiswick High Rd
> >> > >
> >> > >       London
> >> > >
> >> > >       W4 5XT
> >> > >
> >> > >       UK
> >> > >
> >> > >
> >> > > <https://www.openbet.com/email_promo>
> >> > >
> >> > > This message is confidential and intended only for the addressee. If
> >> you
> >> > > have received this message in error, please immediately notify the
> >> > > postmas...@openbet.com <mailto:postmas...@openbet.com> and delete
> it
> >> > > from your system as well as any copies. The content of e-mails as
> well
> >> > > as traffic data may be monitored by OpenBet for employment and
> >> security
> >> > > purposes. To protect the environment please do not print this e-mail
> >> > > unless necessary. OpenBet Ltd. Registered Office: Chiswick Park
> >> Building
> >> > > 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
> >> > > registered in England and Wales. Registered no. 3134634. VAT no.
> >> > > GB927523612
> >> > >
> >> >
> >> >
> >>
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to