The KIP and PR have been updated please go take a look and vote.

For those worried about the [DISCUSS] Streams DSL/StateStore Refactoring
email thread affecting this I believe the cogroup methods fit well into the
streams dsl and won't need to change. We can update the aggregate methods
in the same way we choose to update them in KGroupedStream.

Thanks,
Kyle

On Jun 14, 2017 8:14 PM, "Bill Bejeck" <bbej...@gmail.com> wrote:

> +1
>
> Thanks,
> Bill
>
> On Wed, Jun 14, 2017 at 8:10 PM, Xavier Léauté <xav...@confluent.io>
> wrote:
>
> > +1 from me
> >
> > any stream should be able to initialize the cogroup
> >
> > On Wed, Jun 14, 2017 at 3:44 PM Kyle Winkelman <winkelman.k...@gmail.com
> >
> > wrote:
> >
> > > I will update the kip to have only the aggregator in the first cogroup
> > call
> > > and the initializer and serde in the aggregate calls.
> > >
> > > This seems to be the consensus on the email chain.
> > >
> > > Thanks,
> > > Kyle
> > >
> > > On Jun 14, 2017 5:41 PM, wrote:
> > >
> > > That is not the case. No matter which stream the record comes in on the
> > > initializer is called if there is not currently an object in the store.
> > >
> > > On Jun 14, 2017 5:12 PM, "Guozhang Wang" <wangg...@gmail.com> wrote:
> > >
> > > While regarding where we should ask users to set serdes: I think I'm
> > > convinced by Xavier that they should be in the `aggregate` call (but
> only
> > > those does not pass in a state store supplier) instead of the
> > > `KStream#cogroup` call to be consistent with other aggregate functions.
> > >
> > > BTW another motivation for me to suggest keeping the initializer on the
> > > first stream is that by reviewing the PR (some time ago though, so
> again
> > I
> > > might be wrong) we will trigger the initializer only when we received
> an
> > > incoming record from the first stream whose key is not in the state
> store
> > > yet, while for other streams we will just drop it on the floor. If that
> > is
> > > actually not the case, that we call initializer on any one of the
> > > co-grouped streams' incoming records, then I'm open to set the
> > initializer
> > > at the `aggregate` call as well.
> > >
> > >
> > > Guozhang
> > >
> > > On Wed, Jun 14, 2017 at 2:23 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > I'd suggest we do not block this KIP until the serde work has been
> > sorted
> > > > out: we cannot estimate yet how long it will take yet. Instead let's
> > say
> > > > make an agreement on where we want to specify the serdes: whether on
> > the
> > > > first co-group call or on the aggregate call.
> > > >
> > > > Also about the initializer specification I actually felt that the
> first
> > > > cogrouped stream is special (Kyle please feel free to correct me if
> I'm
> > > > wrong) and that is why I thought it is better to specify the
> > initializer
> > > at
> > > > the beginning: since from the typing you can see that the final
> > > aggregated
> > > > value type is defined to be the same as the first co-grouped stream,
> > and
> > > > for any intermediate stream to co-group, their value types not be
> > > inherited
> > > > but the value be "incorporated" into the original stream:
> > > >
> > > >  <T> CogroupedKStream<K, V> cogroup(final KGroupedStream<K, T>
> > > > groupedStream, final Aggregator<? super K, ? super T, V> aggregator)
> > > >
> > > > Note that we do not have a cogroup function that returns
> > > > CogroupedKStream<K, T>.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Tue, Jun 13, 2017 at 2:31 PM, Bill Bejeck <bbej...@gmail.com>
> > wrote:
> > > >
> > > >> +1 on deferring discussion on Serdes until API improvements are
> ironed
> > > >> out.
> > > >>
> > > >> On Tue, Jun 13, 2017 at 2:06 PM, Matthias J. Sax <
> > matth...@confluent.io
> > > >
> > > >> wrote:
> > > >>
> > > >> > Hi,
> > > >> >
> > > >> > I am just catching up on this thread. (1) as most people agree, we
> > > >> > should not add anything to KStreamBuilder (btw: we actually plan
> to
> > > move
> > > >> > #merge() to KStream and deprecate it on KStreamBuilder as it's a
> > quite
> > > >> > unnatural API atm).
> > > >> >
> > > >> > About specifying Serdes: there is still the idea to improve to
> > overall
> > > >> > API from the current "we are adding more overloads"-pattern to a
> > > >> > builder-like pattern. This might make the whole discussion void if
> > we
> > > do
> > > >> > this. Thus, it might make sense to keep this in mind (or even
> delay
> > > this
> > > >> > KIP?). It seems a waste of time to discuss all this if we are
> going
> > to
> > > >> > chance it in 2 month anyway... Just saying.
> > > >> >
> > > >> >
> > > >> > -Matthias
> > > >> >
> > > >> > On 6/13/17 8:05 AM, Michal Borowiecki wrote:
> > > >> > > You're right, I haven't thought of that.
> > > >> > >
> > > >> > > Cheers,
> > > >> > >
> > > >> > > Michał
> > > >> > >
> > > >> > >
> > > >> > > On 13/06/17 13:00, Kyle Winkelman wrote:
> > > >> > >> First, I would prefer not calling it aggregate because there
> are
> > > >> already
> > > >> > >> plenty of aggregate methods.
> > > >> > >>
> > > >> > >> Second, I dont think this would really work because after each
> > > >> aggregate
> > > >> > >> you now have a unique KTable (someone may want a table with 4
> > > streams
> > > >> > and
> > > >> > >> reuse those 4 in another table but with one more stream added)
> > and
> > > >> > unless
> > > >> > >> we completely duplicate everything every time this isnt really
> > > >> possible.
> > > >> > >> Additionally, the cogroup way just requires 1 more call to
> create
> > > two
> > > >> > >> different tables (normal, windowed, and session windowed) this
> > new
> > > >> way
> > > >> > >> would require copying the aggregate chain.
> > > >> > >>
> > > >> > >> Another way to think about it is with cogroup we know that when
> > > they
> > > >> > call
> > > >> > >> aggregate they arent going to be adding any more aggregators to
> > > that
> > > >> > table
> > > >> > >> but your way requires us to assume they are done adding
> > aggregators
> > > >> > after
> > > >> > >> each call so we must return a ktable just to possibly not need
> to
> > > >> have
> > > >> > >> created it.
> > > >> > >>
> > > >> > >> On Jun 13, 2017 5:20 AM, "Michal Borowiecki" <
> > > >> > michal.borowie...@openbet.com>
> > > >> > >> wrote:
> > > >> > >>
> > > >> > >>> Actually, just had a thought. It started with naming.
> > > >> > >>>
> > > >> > >>> Are we actually co-grouping these streams or are we
> > co-aggregating
> > > >> > them?
> > > >> > >>>
> > > >> > >>> After all, in each of the cogroup calls we are providing an
> > > >> Aggregator
> > > >> > >>> implementation.
> > > >> > >>>
> > > >> > >>>
> > > >> > >>> If they are really co-aggregated, why don't we turn this
> around:
> > > >> > >>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1").
> > > >> > groupByKey();
> > > >> > >>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2").
> > > >> > groupByKey();
> > > >> > >>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3").
> > > >> > groupByKey();
> > > >> > >>>
> > > >> > >>> KTable<K, CG> coagg = grouped1.aggregate(initializer1,
> > > aggregator1,
> > > >> > >>> aggValueSerde1) // this is the unchanged aggregate method
> > > >> > >>>         .aggregate(grouped2, aggregator2)  // this is a new
> > method
> > > >> > >>>         .aggregate(grouped3, aggregator3); // ditto
> > > >> > >>>
> > > >> > >>> This means instead of adding cogroup methods on KGroupStream
> > > >> interface,
> > > >> > >>> adding aggregate method on KTable interface.
> > > >> > >>>
> > > >> > >>> Is that feasible?
> > > >> > >>>
> > > >> > >>> Cheers,
> > > >> > >>> Michał
> > > >> > >>>
> > > >> > >>> On 13/06/17 10:56, Michal Borowiecki wrote:
> > > >> > >>>
> > > >> > >>> Also, I still feel that putting initializer on the first
> cogroup
> > > can
> > > >> > >>> mislead users into thinking the first stream is in some way
> > > special.
> > > >> > >>> Just my 5c.
> > > >> > >>> Michał
> > > >> > >>>
> > > >> > >>> On 13/06/17 09:54, Michal Borowiecki wrote:
> > > >> > >>>
> > > >> > >>> Agree completely with the argument for serdes belonging in the
> > > same
> > > >> > place
> > > >> > >>> as the state store name, which is in the aggregate method.
> > > >> > >>>
> > > >> > >>> Cheers,
> > > >> > >>>
> > > >> > >>> Michał
> > > >> > >>>
> > > >> > >>> On 12/06/17 18:20, Xavier Léauté wrote:
> > > >> > >>>
> > > >> > >>> I think we are discussing two separate things here, so it
> might
> > be
> > > >> > worth
> > > >> > >>> clarifying:
> > > >> > >>>
> > > >> > >>> 1) the position of the initializer with respect to the
> > > aggregators.
> > > >> If
> > > >> > I
> > > >> > >>> understand correctly, Guozhang seems to think it is more
> natural
> > > to
> > > >> > specify
> > > >> > >>> the initializer first, despite it not bearing any relation to
> > the
> > > >> first
> > > >> > >>> aggregator. I can see the argument for specifying the
> > initializer
> > > >> > first,
> > > >> > >>> but I think it is debatable whether mixing it into the first
> > > cogroup
> > > >> > call
> > > >> > >>> leads to a cleaner API or not.
> > > >> > >>>
> > > >> > >>> 2) where the serde should be defined (if necessary). Looking
> at
> > > our
> > > >> > >>> existing APIs in KGroupedStreams, we always offer two
> > aggregate()
> > > >> > >>> methods. The first one takes the name of the store and
> > associated
> > > >> > aggregate
> > > >> > >>> value serde e.g. KGroupedStream.aggregate(Initializer<VR>
> > > >> initializer,
> > > >> > >>> Aggregator<? super K, ? super V, VR> aggregator, Serde<VR>
> > > >> > aggValueSerde,
> > > >> > >>> String queryableStoreName)
> > > >> > >>> The second one only takes a state store supplier, and does not
> > > >> specify
> > > >> > any
> > > >> > >>> serde, e.g. KGroupedStream.aggregate(Initializer<VR>
> > > >> > >>> initializer, Aggregator<? super K, ? super V, VR> aggregator,
> > > final
> > > >> > >>> StateStoreSupplier<KeyValueStore> storeSupplier)
> > > >> > >>> Presumably, when specifying a state store supplier it
> shouldn't
> > be
> > > >> > >>> necessary to specify an aggregate value serde, since the
> > provided
> > > >> > >>> statestore might not need to serialize the values (e.g. it may
> > > just
> > > >> > keep
> > > >> > >>> them as regular objects in heap) or it may have its own
> > > >> > >>> internal serialization format.
> > > >> > >>>
> > > >> > >>> For consistency I think it would be valuable to preserve the
> > same
> > > >> two
> > > >> > >>> aggregate methods for cogroup as well. Since the serde is only
> > > >> > required in
> > > >> > >>> one of the two cases, I believe the serde has no place in the
> > > first
> > > >> > >>> cogroup() call and should only have to be specified as part of
> > the
> > > >> > >>> aggregate() method that takes a state store name. In the case
> > of a
> > > >> > state
> > > >> > >>> store supplier, no serde would be necessary.
> > > >> > >>>
> > > >> > >>>
> > > >> > >>> On Sat, Jun 10, 2017 at 4:09 PM Guozhang Wang <
> > wangg...@gmail.com
> > > >
> > > >> > wrote:
> > > >> > >>>
> > > >> > >>>> I'd agree that the aggregate value serde and the initializer
> > does
> > > >> not
> > > >> > >>>> bear direct relationship with the first `cogroup` calls, but
> > > after
> > > >> I
> > > >> > tried
> > > >> > >>>> to write some example code with these two different set of
> > APIs I
> > > >> > felt the
> > > >> > >>>> current APIs just program more naturally.
> > > >> > >>>>
> > > >> > >>>> I know it is kinda subjective, but I do think that user
> > > experience
> > > >> > may be
> > > >> > >>>> more important as a deciding factor than the logical argument
> > for
> > > >> > public
> > > >> > >>>> interfaces. So I'd recommend people to also try out writing
> > some
> > > >> > example
> > > >> > >>>> lines also and we can circle back and discuss which one feels
> > > more
> > > >> > natural
> > > >> > >>>> to write code.
> > > >> > >>>>
> > > >> > >>>>
> > > >> > >>>> Guozhang
> > > >> > >>>>
> > > >> > >>>> On Fri, Jun 9, 2017 at 1:59 AM, Michal Borowiecki <
> > > >> > >>>> michal.borowie...@openbet.com> wrote:
> > > >> > >>>>
> > > >> > >>>>> I feel it would make more sense to move the initializer and
> > > serde
> > > >> to
> > > >> > the
> > > >> > >>>>> final aggregate statement, since the serde only applies to
> the
> > > >> state
> > > >> > >>>>> store,
> > > >> > >>>>> and the initializer doesn't bear any relation to the first
> > group
> > > >> in
> > > >> > >>>>> particular.
> > > >> > >>>>>
> > > >> > >>>>> +1 for moving initializer and serde from cogroup() to the
> > > >> aggregate()
> > > >> > >>>>> for the reasons mentioned above.
> > > >> > >>>>>
> > > >> > >>>>> Cheers,
> > > >> > >>>>>
> > > >> > >>>>> Michał
> > > >> > >>>>>
> > > >> > >>>>> On 08/06/17 22:44, Guozhang Wang wrote:
> > > >> > >>>>>
> > > >> > >>>> Note that although the internal `AbstractStoreSupplier` does
> > > >> maintain
> > > >> > the
> > > >> > >>>>> key-value serdes, we do not enforce the interface of
> > > >> > `StateStoreSupplier`
> > > >> > >>>>> to always retain that information, and hence we cannot
> assume
> > > that
> > > >> > >>>>> StateStoreSuppliers always retain key / value serdes.
> > > >> > >>>>>
> > > >> > >>>>> On Thu, Jun 8, 2017 at 11:51 AM, Xavier Léauté <
> > > >> xav...@confluent.io>
> > > >> > <xav...@confluent.io> wrote:
> > > >> > >>>>>
> > > >> > >>>>>
> > > >> > >>>>> Another reason for the serde not to be in the first cogroup
> > > call,
> > > >> is
> > > >> > that
> > > >> > >>>>> the serde should not be required if you pass a
> > > StateStoreSupplier
> > > >> to
> > > >> > >>>>> aggregate()
> > > >> > >>>>>
> > > >> > >>>>> Regarding the aggregated type <T> I don't the why
> initializer
> > > >> should
> > > >> > be
> > > >> > >>>>> favored over aggregator to define the type. In my mind
> > > separating
> > > >> the
> > > >> > >>>>> initializer into the last aggregate call clearly indicates
> > that
> > > >> the
> > > >> > >>>>> initializer is independent of any of the aggregators or
> > streams
> > > >> and
> > > >> > that we
> > > >> > >>>>> don't wait for grouped1 events to initialize the co-group.
> > > >> > >>>>>
> > > >> > >>>>> On Thu, Jun 8, 2017 at 11:14 AM Guozhang Wang <
> > > wangg...@gmail.com
> > > >
> > > >> <
> > > >> > wangg...@gmail.com> wrote:
> > > >> > >>>>>
> > > >> > >>>>>
> > > >> > >>>>> On a second thought... This is the current proposal API
> > > >> > >>>>>
> > > >> > >>>>>
> > > >> > >>>>> ```
> > > >> > >>>>>
> > > >> > >>>>> <T> CogroupedKStream<K, T> cogroup(final Initializer<T>
> > > >> initializer,
> > > >> > >>>>>
> > > >> > >>>>> final
> > > >> > >>>>>
> > > >> > >>>>> Aggregator<? super K, ? super V, T> aggregator, final
> Serde<T>
> > > >> > >>>>> aggValueSerde)
> > > >> > >>>>>
> > > >> > >>>>> ```
> > > >> > >>>>>
> > > >> > >>>>>
> > > >> > >>>>> If we do not have the initializer in the first co-group it
> > might
> > > >> be
> > > >> > a bit
> > > >> > >>>>> awkward for users to specify the aggregator that returns a
> > typed
> > > >> <T>
> > > >> > >>>>>
> > > >> > >>>>> value?
> > > >> > >>>>>
> > > >> > >>>>> Maybe it is still better to put these two functions in the
> > same
> > > >> api?
> > > >> > >>>>>
> > > >> > >>>>>
> > > >> > >>>>>
> > > >> > >>>>> Guozhang
> > > >> > >>>>>
> > > >> > >>>>> On Thu, Jun 8, 2017 at 11:08 AM, Guozhang Wang <
> > > >> wangg...@gmail.com>
> > > >> > <wangg...@gmail.com>
> > > >> > >>>>>
> > > >> > >>>>> wrote:
> > > >> > >>>>>
> > > >> > >>>>> This suggestion lgtm. I would vote for the first alternative
> > > than
> > > >> > >>>>>
> > > >> > >>>>> adding
> > > >> > >>>>>
> > > >> > >>>>> it to the `KStreamBuilder` though.
> > > >> > >>>>>
> > > >> > >>>>> On Thu, Jun 8, 2017 at 10:58 AM, Xavier Léauté <
> > > >> xav...@confluent.io>
> > > >> > <xav...@confluent.io>
> > > >> > >>>>> wrote:
> > > >> > >>>>>
> > > >> > >>>>>
> > > >> > >>>>> I have a minor suggestion to make the API a little bit more
> > > >> > symmetric.
> > > >> > >>>>> I feel it would make more sense to move the initializer and
> > > serde
> > > >> to
> > > >> > >>>>>
> > > >> > >>>>> the
> > > >> > >>>>>
> > > >> > >>>>> final aggregate statement, since the serde only applies to
> the
> > > >> state
> > > >> > >>>>> store,
> > > >> > >>>>> and the initializer doesn't bear any relation to the first
> > group
> > > >> in
> > > >> > >>>>> particular. It would end up looking like this:
> > > >> > >>>>>
> > > >> > >>>>> KTable<K, CG> cogrouped =
> > > >> > >>>>>     grouped1.cogroup(aggregator1)
> > > >> > >>>>>             .cogroup(grouped2, aggregator2)
> > > >> > >>>>>             .cogroup(grouped3, aggregator3)
> > > >> > >>>>>             .aggregate(initializer1, aggValueSerde,
> > storeName1);
> > > >> > >>>>>
> > > >> > >>>>> Alternatively, we could move the the first cogroup() method
> to
> > > >> > >>>>> KStreamBuilder, similar to how we have .merge()
> > > >> > >>>>> and end up with an api that would be even more symmetric.
> > > >> > >>>>>
> > > >> > >>>>> KStreamBuilder.cogroup(grouped1, aggregator1)
> > > >> > >>>>>               .cogroup(grouped2, aggregator2)
> > > >> > >>>>>               .cogroup(grouped3, aggregator3)
> > > >> > >>>>>               .aggregate(initializer1, aggValueSerde,
> > > storeName1);
> > > >> > >>>>>
> > > >> > >>>>> This doesn't have to be a blocker, but I thought it would
> make
> > > the
> > > >> > API
> > > >> > >>>>> just
> > > >> > >>>>> a tad cleaner.
> > > >> > >>>>>
> > > >> > >>>>> On Tue, Jun 6, 2017 at 3:59 PM Guozhang Wang <
> > > wangg...@gmail.com>
> > > >> <
> > > >> > wangg...@gmail.com>
> > > >> > >>>>>
> > > >> > >>>>> wrote:
> > > >> > >>>>>
> > > >> > >>>>> Kyle,
> > > >> > >>>>>
> > > >> > >>>>> Thanks a lot for the updated KIP. It looks good to me.
> > > >> > >>>>>
> > > >> > >>>>>
> > > >> > >>>>> Guozhang
> > > >> > >>>>>
> > > >> > >>>>>
> > > >> > >>>>> On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski <
> > j...@jagunet.com>
> > > <
> > > >> > j...@jagunet.com>
> > > >> > >>>>>
> > > >> > >>>>> wrote:
> > > >> > >>>>>
> > > >> > >>>>> This makes much more sense to me. +1
> > > >> > >>>>>
> > > >> > >>>>>
> > > >> > >>>>> On Jun 1, 2017, at 10:33 AM, Kyle Winkelman <
> > > >> > >>>>>
> > > >> > >>>>> winkelman.k...@gmail.com>
> > > >> > >>>>>
> > > >> > >>>>> wrote:
> > > >> > >>>>>
> > > >> > >>>>> I have updated the KIP and my PR. Let me know what you
> think.
> > > >> > >>>>> To created a cogrouped stream just call cogroup on a
> > > >> > >>>>>
> > > >> > >>>>> KgroupedStream
> > > >> > >>>>>
> > > >> > >>>>> and
> > > >> > >>>>>
> > > >> > >>>>> supply the initializer, aggValueSerde, and an aggregator.
> Then
> > > >> > >>>>>
> > > >> > >>>>> continue
> > > >> > >>>>>
> > > >> > >>>>> adding kgroupedstreams and aggregators. Then call one of the
> > > >> > >>>>>
> > > >> > >>>>> many
> > > >> > >>>>>
> > > >> > >>>>> aggregate
> > > >> > >>>>>
> > > >> > >>>>> calls to create a KTable.
> > > >> > >>>>>
> > > >> > >>>>> Thanks,
> > > >> > >>>>> Kyle
> > > >> > >>>>>
> > > >> > >>>>> On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com>
> <
> > > >> > damian....@gmail.com>
> > > >> > >>>>>
> > > >> > >>>>> wrote:
> > > >> > >>>>>
> > > >> > >>>>> Hi Kyle,
> > > >> > >>>>>
> > > >> > >>>>> Thanks for the update. I think just one initializer makes
> > sense
> > > >> > >>>>>
> > > >> > >>>>> as
> > > >> > >>>>>
> > > >> > >>>>> it
> > > >> > >>>>>
> > > >> > >>>>> should only be called once per key and generally it is just
> > > >> > >>>>>
> > > >> > >>>>> going
> > > >> > >>>>>
> > > >> > >>>>> to
> > > >> > >>>>>
> > > >> > >>>>> create
> > > >> > >>>>>
> > > >> > >>>>> a new instance of whatever the Aggregate class is.
> > > >> > >>>>>
> > > >> > >>>>> Cheers,
> > > >> > >>>>> Damian
> > > >> > >>>>>
> > > >> > >>>>> On Wed, 31 May 2017 at 20:09 Kyle Winkelman <
> > > >> > >>>>>
> > > >> > >>>>> winkelman.k...@gmail.com
> > > >> > >>>>>
> > > >> > >>>>> wrote:
> > > >> > >>>>>
> > > >> > >>>>>
> > > >> > >>>>> Hello all,
> > > >> > >>>>>
> > > >> > >>>>> I have spent some more time on this and the best
> alternative I
> > > >> > >>>>>
> > > >> > >>>>> have
> > > >> > >>>>>
> > > >> > >>>>> come
> > > >> > >>>>>
> > > >> > >>>>> up
> > > >> > >>>>>
> > > >> > >>>>> with is:
> > > >> > >>>>> KGroupedStream has a single cogroup call that takes an
> > > >> > >>>>>
> > > >> > >>>>> initializer
> > > >> > >>>>>
> > > >> > >>>>> and
> > > >> > >>>>>
> > > >> > >>>>> an
> > > >> > >>>>>
> > > >> > >>>>> aggregator.
> > > >> > >>>>> CogroupedKStream has a cogroup call that takes additional
> > > >> > >>>>>
> > > >> > >>>>> groupedStream
> > > >> > >>>>>
> > > >> > >>>>> aggregator pairs.
> > > >> > >>>>> CogroupedKStream has multiple aggregate methods that create
> > > >> > >>>>>
> > > >> > >>>>> the
> > > >> > >>>>>
> > > >> > >>>>> different
> > > >> > >>>>>
> > > >> > >>>>> stores.
> > > >> > >>>>>
> > > >> > >>>>> I plan on updating the kip but I want people's input on if
> we
> > > >> > >>>>>
> > > >> > >>>>> should
> > > >> > >>>>>
> > > >> > >>>>> have
> > > >> > >>>>>
> > > >> > >>>>> the initializer be passed in once at the beginning or if we
> > > >> > >>>>>
> > > >> > >>>>> should
> > > >> > >>>>>
> > > >> > >>>>> instead
> > > >> > >>>>>
> > > >> > >>>>> have the initializer be required for each call to one of the
> > > >> > >>>>>
> > > >> > >>>>> aggregate
> > > >> > >>>>>
> > > >> > >>>>> calls. The first makes more sense to me but doesnt allow the
> > > >> > >>>>>
> > > >> > >>>>> user
> > > >> > >>>>>
> > > >> > >>>>> to
> > > >> > >>>>>
> > > >> > >>>>> specify different initializers for different tables.
> > > >> > >>>>>
> > > >> > >>>>> Thanks,
> > > >> > >>>>> Kyle
> > > >> > >>>>>
> > > >> > >>>>> On May 24, 2017 7:46 PM, "Kyle Winkelman" <
> > > >> > >>>>>
> > > >> > >>>>> winkelman.k...@gmail.com>
> > > >> > >>>>>
> > > >> > >>>>> wrote:
> > > >> > >>>>>
> > > >> > >>>>>
> > > >> > >>>>> Yea I really like that idea I'll see what I can do to update
> > > >> > >>>>>
> > > >> > >>>>> the
> > > >> > >>>>>
> > > >> > >>>>> kip
> > > >> > >>>>>
> > > >> > >>>>> and
> > > >> > >>>>>
> > > >> > >>>>> my pr when I have some time. I'm not sure how well creating
> > > >> > >>>>>
> > > >> > >>>>> the
> > > >> > >>>>>
> > > >> > >>>>> kstreamaggregates will go though because at that point I
> will
> > > >> > >>>>>
> > > >> > >>>>> have
> > > >> > >>>>>
> > > >> > >>>>> thrown
> > > >> > >>>>>
> > > >> > >>>>> away the type of the values. It will be type safe I just may
> > > >> > >>>>>
> > > >> > >>>>> need to
> > > >> > >>>>>
> > > >> > >>>>> do a
> > > >> > >>>>>
> > > >> > >>>>> little forcing.
> > > >> > >>>>>
> > > >> > >>>>> Thanks,
> > > >> > >>>>> Kyle
> > > >> > >>>>>
> > > >> > >>>>> On May 24, 2017 3:28 PM, "Guozhang Wang" <
> wangg...@gmail.com
> > > >> > >>>>>
> > > >> > >>>>> wrote:
> > > >> > >>>>>
> > > >> > >>>>> Kyle,
> > > >> > >>>>>
> > > >> > >>>>> Thanks for the explanations, my previous read on the wiki
> > > >> > >>>>>
> > > >> > >>>>> examples
> > > >> > >>>>>
> > > >> > >>>>> was
> > > >> > >>>>>
> > > >> > >>>>> wrong.
> > > >> > >>>>>
> > > >> > >>>>> So I guess my motivation should be "reduced" to: can we move
> > > >> > >>>>>
> > > >> > >>>>> the
> > > >> > >>>>>
> > > >> > >>>>> window
> > > >> > >>>>>
> > > >> > >>>>> specs param from "KGroupedStream#cogroup(..)" to
> > > >> > >>>>> "CogroupedKStream#aggregate(..)", and my motivations are:
> > > >> > >>>>>
> > > >> > >>>>> 1. minor: we can reduce the #.generics in CogroupedKStream
> > > >> > >>>>>
> > > >> > >>>>> from
> > > >> > >>>>>
> > > >> > >>>>> 3
> > > >> > >>>>>
> > > >> > >>>>> to
> > > >> > >>>>>
> > > >> > >>>>> 2.
> > > >> > >>>>>
> > > >> > >>>>> 2. major: this is for extensibility of the APIs, and since
> > > >> > >>>>>
> > > >> > >>>>> we
> > > >> > >>>>>
> > > >> > >>>>> are
> > > >> > >>>>>
> > > >> > >>>>> removing
> > > >> > >>>>>
> > > >> > >>>>> the "Evolving" annotations on Streams it may be harder to
> > > >> > >>>>>
> > > >> > >>>>> change it
> > > >> > >>>>>
> > > >> > >>>>> again
> > > >> > >>>>>
> > > >> > >>>>> in the future. The extended use cases are that people wanted
> > > >> > >>>>>
> > > >> > >>>>> to
> > > >> > >>>>>
> > > >> > >>>>> have
> > > >> > >>>>>
> > > >> > >>>>> windowed running aggregates on different granularities, e.g.
> > > >> > >>>>>
> > > >> > >>>>> "give
> > > >> > >>>>>
> > > >> > >>>>> me
> > > >> > >>>>>
> > > >> > >>>>> the
> > > >> > >>>>>
> > > >> > >>>>> counts per-minute, per-hour, per-day and per-week", and
> > > >> > >>>>>
> > > >> > >>>>> today
> > > >> > >>>>>
> > > >> > >>>>> in
> > > >> > >>>>>
> > > >> > >>>>> DSL
> > > >> > >>>>>
> > > >> > >>>>> we
> > > >> > >>>>>
> > > >> > >>>>> need to specify that case in multiple aggregate operators,
> > > >> > >>>>>
> > > >> > >>>>> which
> > > >> > >>>>>
> > > >> > >>>>> gets
> > > >> > >>>>>
> > > >> > >>>>> a
> > > >> > >>>>>
> > > >> > >>>>> state store / changelog, etc. And it is possible to optimize
> > > >> > >>>>>
> > > >> > >>>>> it
> > > >> > >>>>>
> > > >> > >>>>> as
> > > >> > >>>>>
> > > >> > >>>>> well
> > > >> > >>>>>
> > > >> > >>>>> to
> > > >> > >>>>>
> > > >> > >>>>> a single state store. Its implementation would be tricky as
> > > >> > >>>>>
> > > >> > >>>>> you
> > > >> > >>>>>
> > > >> > >>>>> need
> > > >> > >>>>>
> > > >> > >>>>> to
> > > >> > >>>>>
> > > >> > >>>>> contain different lengthed windows within your window store
> > > >> > >>>>>
> > > >> > >>>>> but
> > > >> > >>>>>
> > > >> > >>>>> just
> > > >> > >>>>>
> > > >> > >>>>> from
> > > >> > >>>>>
> > > >> > >>>>> the public API point of view, it could be specified as:
> > > >> > >>>>>
> > > >> > >>>>> CogroupedKStream stream = stream1.cogroup(stream2, ...
> > > >> > >>>>> "state-store-name");
> > > >> > >>>>>
> > > >> > >>>>> table1 = stream.aggregate(/*per-minute window*/)
> > > >> > >>>>> table2 = stream.aggregate(/*per-hour window*/)
> > > >> > >>>>> table3 = stream.aggregate(/*per-day window*/)
> > > >> > >>>>>
> > > >> > >>>>> while underlying we are only using a single store
> > > >> > >>>>>
> > > >> > >>>>> "state-store-name"
> > > >> > >>>>>
> > > >> > >>>>> for
> > > >> > >>>>>
> > > >> > >>>>> it.
> > > >> > >>>>>
> > > >> > >>>>>
> > > >> > >>>>> Although this feature is out of the scope of this KIP, I'd
> > > >> > >>>>>
> > > >> > >>>>> like
> > > >> > >>>>>
> > > >> > >>>>> to
> > > >> > >>>>>
> > > >> > >>>>> discuss
> > > >> > >>>>>
> > > >> > >>>>> if we can "leave the door open" to make such changes without
> > > >> > >>>>>
> > > >> > >>>>> modifying
> > > >> > >>>>>
> > > >> > >>>>> the
> > > >> > >>>>>
> > > >> > >>>>> public APIs .
> > > >> > >>>>>
> > > >> > >>>>> Guozhang
> > > >> > >>>>>
> > > >> > >>>>>
> > > >> > >>>>> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman <
> > > >> > >>>>>
> > > >> > >>>>> winkelman.k...@gmail.com
> > > >> > >>>>>
> > > >> > >>>>> wrote:
> > > >> > >>>>>
> > > >> > >>>>>
> > > >> > >>>>> I allow defining a single window/sessionwindow one time
> > > >> > >>>>>
> > > >> > >>>>> when
> > > >> > >>>>>
> > > >> > >>>>> you
> > > >> > >>>>>
> > > >> > >>>>> make
> > > >> > >>>>>
> > > >> > >>>>> the
> > > >> > >>>>>
> > > >> > >>>>> cogroup call from a KGroupedStream. From then on you are
> > > >> > >>>>>
> > > >> > >>>>> using
> > > >> > >>>>>
> > > >> > >>>>> the
> > > >> > >>>>>
> > > >> > >>>>> cogroup
> > > >> > >>>>>
> > > >> > >>>>> call from with in CogroupedKStream which doesnt accept any
> > > >> > >>>>>
> > > >> > >>>>> additional
> > > >> > >>>>>
> > > >> > >>>>> windows/sessionwindows.
> > > >> > >>>>>
> > > >> > >>>>> Is this what you meant by your question or did I
> > > >> > >>>>>
> > > >> > >>>>> misunderstand?
> > > >> > >>>>>
> > > >> > >>>>> On May 23, 2017 9:33 PM, "Guozhang Wang" <
> > > >> > >>>>>
> > > >> > >>>>> wangg...@gmail.com
> > > >> > >>>>>
> > > >> > >>>>> wrote:
> > > >> > >>>>>
> > > >> > >>>>> Another question that came to me is on "window alignment":
> > > >> > >>>>>
> > > >> > >>>>> from
> > > >> > >>>>>
> > > >> > >>>>> the
> > > >> > >>>>>
> > > >> > >>>>> KIP
> > > >> > >>>>>
> > > >> > >>>>> it
> > > >> > >>>>>
> > > >> > >>>>> seems you are allowing users to specify a (potentially
> > > >> > >>>>>
> > > >> > >>>>> different)
> > > >> > >>>>>
> > > >> > >>>>> window
> > > >> > >>>>>
> > > >> > >>>>> spec in each co-grouped input stream. So if these window
> > > >> > >>>>>
> > > >> > >>>>> specs
> > > >> > >>>>>
> > > >> > >>>>> are
> > > >> > >>>>>
> > > >> > >>>>> different how should we "align" them with different input
> > > >> > >>>>>
> > > >> > >>>>> streams? I
> > > >> > >>>>>
> > > >> > >>>>> think
> > > >> > >>>>>
> > > >> > >>>>> it is more natural to only specify on window spec in the
> > > >> > >>>>>
> > > >> > >>>>> KTable<RK, V> CogroupedKStream#aggregate(Windows);
> > > >> > >>>>>
> > > >> > >>>>>
> > > >> > >>>>> And remove it from the cogroup() functions. WDYT?
> > > >> > >>>>>
> > > >> > >>>>>
> > > >> > >>>>> Guozhang
> > > >> > >>>>>
> > > >> > >>>>> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang <
> > > >> > >>>>>
> > > >> > >>>>> wangg...@gmail.com>
> > > >> > >>>>>
> > > >> > >>>>> wrote:
> > > >> > >>>>>
> > > >> > >>>>> Thanks for the proposal Kyle, this is a quite common use
> > > >> > >>>>>
> > > >> > >>>>> case
> > > >> > >>>>>
> > > >> > >>>>> to
> > > >> > >>>>>
> > > >> > >>>>> support
> > > >> > >>>>>
> > > >> > >>>>> such multi-way table join (i.e. N source tables with N
> > > >> > >>>>>
> > > >> > >>>>> aggregate
> > > >> > >>>>>
> > > >> > >>>>> func)
> > > >> > >>>>>
> > > >> > >>>>> with
> > > >> > >>>>>
> > > >> > >>>>> a single store and N+1 serdes, I have seen lots of people
> > > >> > >>>>>
> > > >> > >>>>> using
> > > >> > >>>>>
> > > >> > >>>>> the
> > > >> > >>>>>
> > > >> > >>>>> low-level PAPI to achieve this goal.
> > > >> > >>>>>
> > > >> > >>>>>
> > > >> > >>>>> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman <
> > > >> > >>>>>
> > > >> > >>>>> winkelman.k...@gmail.com
> > > >> > >>>>>
> > > >> > >>>>> wrote:
> > > >> > >>>>>
> > > >> > >>>>> I like your point about not handling other cases such as
> > > >> > >>>>>
> > > >> > >>>>> count
> > > >> > >>>>>
> > > >> > >>>>> and
> > > >> > >>>>>
> > > >> > >>>>> reduce.
> > > >> > >>>>>
> > > >> > >>>>> I think that reduce may not make sense because reduce
> > > >> > >>>>>
> > > >> > >>>>> assumes
> > > >> > >>>>>
> > > >> > >>>>> that
> > > >> > >>>>>
> > > >> > >>>>> the
> > > >> > >>>>>
> > > >> > >>>>> input values are the same as the output values. With
> > > >> > >>>>>
> > > >> > >>>>> cogroup
> > > >> > >>>>>
> > > >> > >>>>> ...
> > > >> > >
> > > >> > > --
> > > >> > > Signature
> > > >> > > <http://www.openbet.com/>     Michal Borowiecki
> > > >> > > Senior Software Engineer L4
> > > >> > >       T:      +44 208 742 1600 <+44%2020%208742%201600>
> > > >> > >
> > > >> > >
> > > >> > >       +44 203 249 8448 <+44%2020%203249%208448>
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >       E:      michal.borowie...@openbet.com
> > > >> > >       W:      www.openbet.com <http://www.openbet.com/>
> > > >> > >
> > > >> > >
> > > >> > >       OpenBet Ltd
> > > >> > >
> > > >> > >       Chiswick Park Building 9
> > > >> > >
> > > >> > >       566 Chiswick High Rd
> > > >> > >
> > > >> > >       London
> > > >> > >
> > > >> > >       W4 5XT
> > > >> > >
> > > >> > >       UK
> > > >> > >
> > > >> > >
> > > >> > > <https://www.openbet.com/email_promo>
> > > >> > >
> > > >> > > This message is confidential and intended only for the
> addressee.
> > If
> > > >> you
> > > >> > > have received this message in error, please immediately notify
> the
> > > >> > > postmas...@openbet.com <mailto:postmas...@openbet.com> and
> delete
> > > it
> > > >> > > from your system as well as any copies. The content of e-mails
> as
> > > well
> > > >> > > as traffic data may be monitored by OpenBet for employment and
> > > >> security
> > > >> > > purposes. To protect the environment please do not print this
> > e-mail
> > > >> > > unless necessary. OpenBet Ltd. Registered Office: Chiswick Park
> > > >> Building
> > > >> > > 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A
> > company
> > > >> > > registered in England and Wales. Registered no. 3134634. VAT no.
> > > >> > > GB927523612
> > > >> > >
> > > >> >
> > > >> >
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Reply via email to