Hi,

I am just catching up on this thread. (1) as most people agree, we
should not add anything to KStreamBuilder (btw: we actually plan to move
#merge() to KStream and deprecate it on KStreamBuilder as it's a quite
unnatural API atm).

About specifying Serdes: there is still the idea to improve to overall
API from the current "we are adding more overloads"-pattern to a
builder-like pattern. This might make the whole discussion void if we do
this. Thus, it might make sense to keep this in mind (or even delay this
KIP?). It seems a waste of time to discuss all this if we are going to
chance it in 2 month anyway... Just saying.


-Matthias

On 6/13/17 8:05 AM, Michal Borowiecki wrote:
> You're right, I haven't thought of that.
> 
> Cheers,
> 
> Michał
> 
> 
> On 13/06/17 13:00, Kyle Winkelman wrote:
>> First, I would prefer not calling it aggregate because there are already
>> plenty of aggregate methods.
>>
>> Second, I dont think this would really work because after each aggregate
>> you now have a unique KTable (someone may want a table with 4 streams and
>> reuse those 4 in another table but with one more stream added) and unless
>> we completely duplicate everything every time this isnt really possible.
>> Additionally, the cogroup way just requires 1 more call to create two
>> different tables (normal, windowed, and session windowed) this new way
>> would require copying the aggregate chain.
>>
>> Another way to think about it is with cogroup we know that when they call
>> aggregate they arent going to be adding any more aggregators to that table
>> but your way requires us to assume they are done adding aggregators after
>> each call so we must return a ktable just to possibly not need to have
>> created it.
>>
>> On Jun 13, 2017 5:20 AM, "Michal Borowiecki" <michal.borowie...@openbet.com>
>> wrote:
>>
>>> Actually, just had a thought. It started with naming.
>>>
>>> Are we actually co-grouping these streams or are we co-aggregating them?
>>>
>>> After all, in each of the cogroup calls we are providing an Aggregator
>>> implementation.
>>>
>>>
>>> If they are really co-aggregated, why don't we turn this around:
>>> KGroupedStream<K, V1> grouped1 = builder.stream("topic1").groupByKey();
>>> KGroupedStream<K, V2> grouped2 = builder.stream("topic2").groupByKey();
>>> KGroupedStream<K, V3> grouped3 = builder.stream("topic3").groupByKey();
>>>
>>> KTable<K, CG> coagg = grouped1.aggregate(initializer1, aggregator1,
>>> aggValueSerde1) // this is the unchanged aggregate method
>>>         .aggregate(grouped2, aggregator2)  // this is a new method
>>>         .aggregate(grouped3, aggregator3); // ditto
>>>
>>> This means instead of adding cogroup methods on KGroupStream interface,
>>> adding aggregate method on KTable interface.
>>>
>>> Is that feasible?
>>>
>>> Cheers,
>>> Michał
>>>
>>> On 13/06/17 10:56, Michal Borowiecki wrote:
>>>
>>> Also, I still feel that putting initializer on the first cogroup can
>>> mislead users into thinking the first stream is in some way special.
>>> Just my 5c.
>>> Michał
>>>
>>> On 13/06/17 09:54, Michal Borowiecki wrote:
>>>
>>> Agree completely with the argument for serdes belonging in the same place
>>> as the state store name, which is in the aggregate method.
>>>
>>> Cheers,
>>>
>>> Michał
>>>
>>> On 12/06/17 18:20, Xavier Léauté wrote:
>>>
>>> I think we are discussing two separate things here, so it might be worth
>>> clarifying:
>>>
>>> 1) the position of the initializer with respect to the aggregators. If I
>>> understand correctly, Guozhang seems to think it is more natural to specify
>>> the initializer first, despite it not bearing any relation to the first
>>> aggregator. I can see the argument for specifying the initializer first,
>>> but I think it is debatable whether mixing it into the first cogroup call
>>> leads to a cleaner API or not.
>>>
>>> 2) where the serde should be defined (if necessary). Looking at our
>>> existing APIs in KGroupedStreams, we always offer two aggregate()
>>> methods. The first one takes the name of the store and associated aggregate
>>> value serde e.g. KGroupedStream.aggregate(Initializer<VR> initializer,
>>> Aggregator<? super K, ? super V, VR> aggregator, Serde<VR> aggValueSerde,
>>> String queryableStoreName)
>>> The second one only takes a state store supplier, and does not specify any
>>> serde, e.g. KGroupedStream.aggregate(Initializer<VR>
>>> initializer, Aggregator<? super K, ? super V, VR> aggregator, final
>>> StateStoreSupplier<KeyValueStore> storeSupplier)
>>> Presumably, when specifying a state store supplier it shouldn't be
>>> necessary to specify an aggregate value serde, since the provided
>>> statestore might not need to serialize the values (e.g. it may just keep
>>> them as regular objects in heap) or it may have its own
>>> internal serialization format.
>>>
>>> For consistency I think it would be valuable to preserve the same two
>>> aggregate methods for cogroup as well. Since the serde is only required in
>>> one of the two cases, I believe the serde has no place in the first
>>> cogroup() call and should only have to be specified as part of the
>>> aggregate() method that takes a state store name. In the case of a state
>>> store supplier, no serde would be necessary.
>>>
>>>
>>> On Sat, Jun 10, 2017 at 4:09 PM Guozhang Wang <wangg...@gmail.com> wrote:
>>>
>>>> I'd agree that the aggregate value serde and the initializer does not
>>>> bear direct relationship with the first `cogroup` calls, but after I tried
>>>> to write some example code with these two different set of APIs I felt the
>>>> current APIs just program more naturally.
>>>>
>>>> I know it is kinda subjective, but I do think that user experience may be
>>>> more important as a deciding factor than the logical argument for public
>>>> interfaces. So I'd recommend people to also try out writing some example
>>>> lines also and we can circle back and discuss which one feels more natural
>>>> to write code.
>>>>
>>>>
>>>> Guozhang
>>>>
>>>> On Fri, Jun 9, 2017 at 1:59 AM, Michal Borowiecki <
>>>> michal.borowie...@openbet.com> wrote:
>>>>
>>>>> I feel it would make more sense to move the initializer and serde to the
>>>>> final aggregate statement, since the serde only applies to the state
>>>>> store,
>>>>> and the initializer doesn't bear any relation to the first group in
>>>>> particular.
>>>>>
>>>>> +1 for moving initializer and serde from cogroup() to the aggregate()
>>>>> for the reasons mentioned above.
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Michał
>>>>>
>>>>> On 08/06/17 22:44, Guozhang Wang wrote:
>>>>>
>>>> Note that although the internal `AbstractStoreSupplier` does maintain the
>>>>> key-value serdes, we do not enforce the interface of `StateStoreSupplier`
>>>>> to always retain that information, and hence we cannot assume that
>>>>> StateStoreSuppliers always retain key / value serdes.
>>>>>
>>>>> On Thu, Jun 8, 2017 at 11:51 AM, Xavier Léauté <xav...@confluent.io> 
>>>>> <xav...@confluent.io> wrote:
>>>>>
>>>>>
>>>>> Another reason for the serde not to be in the first cogroup call, is that
>>>>> the serde should not be required if you pass a StateStoreSupplier to
>>>>> aggregate()
>>>>>
>>>>> Regarding the aggregated type <T> I don't the why initializer should be
>>>>> favored over aggregator to define the type. In my mind separating the
>>>>> initializer into the last aggregate call clearly indicates that the
>>>>> initializer is independent of any of the aggregators or streams and that 
>>>>> we
>>>>> don't wait for grouped1 events to initialize the co-group.
>>>>>
>>>>> On Thu, Jun 8, 2017 at 11:14 AM Guozhang Wang <wangg...@gmail.com> 
>>>>> <wangg...@gmail.com> wrote:
>>>>>
>>>>>
>>>>> On a second thought... This is the current proposal API
>>>>>
>>>>>
>>>>> ```
>>>>>
>>>>> <T> CogroupedKStream<K, T> cogroup(final Initializer<T> initializer,
>>>>>
>>>>> final
>>>>>
>>>>> Aggregator<? super K, ? super V, T> aggregator, final Serde<T>
>>>>> aggValueSerde)
>>>>>
>>>>> ```
>>>>>
>>>>>
>>>>> If we do not have the initializer in the first co-group it might be a bit
>>>>> awkward for users to specify the aggregator that returns a typed <T>
>>>>>
>>>>> value?
>>>>>
>>>>> Maybe it is still better to put these two functions in the same api?
>>>>>
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>> On Thu, Jun 8, 2017 at 11:08 AM, Guozhang Wang <wangg...@gmail.com> 
>>>>> <wangg...@gmail.com>
>>>>>
>>>>> wrote:
>>>>>
>>>>> This suggestion lgtm. I would vote for the first alternative than
>>>>>
>>>>> adding
>>>>>
>>>>> it to the `KStreamBuilder` though.
>>>>>
>>>>> On Thu, Jun 8, 2017 at 10:58 AM, Xavier Léauté <xav...@confluent.io> 
>>>>> <xav...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>
>>>>> I have a minor suggestion to make the API a little bit more symmetric.
>>>>> I feel it would make more sense to move the initializer and serde to
>>>>>
>>>>> the
>>>>>
>>>>> final aggregate statement, since the serde only applies to the state
>>>>> store,
>>>>> and the initializer doesn't bear any relation to the first group in
>>>>> particular. It would end up looking like this:
>>>>>
>>>>> KTable<K, CG> cogrouped =
>>>>>     grouped1.cogroup(aggregator1)
>>>>>             .cogroup(grouped2, aggregator2)
>>>>>             .cogroup(grouped3, aggregator3)
>>>>>             .aggregate(initializer1, aggValueSerde, storeName1);
>>>>>
>>>>> Alternatively, we could move the the first cogroup() method to
>>>>> KStreamBuilder, similar to how we have .merge()
>>>>> and end up with an api that would be even more symmetric.
>>>>>
>>>>> KStreamBuilder.cogroup(grouped1, aggregator1)
>>>>>               .cogroup(grouped2, aggregator2)
>>>>>               .cogroup(grouped3, aggregator3)
>>>>>               .aggregate(initializer1, aggValueSerde, storeName1);
>>>>>
>>>>> This doesn't have to be a blocker, but I thought it would make the API
>>>>> just
>>>>> a tad cleaner.
>>>>>
>>>>> On Tue, Jun 6, 2017 at 3:59 PM Guozhang Wang <wangg...@gmail.com> 
>>>>> <wangg...@gmail.com>
>>>>>
>>>>> wrote:
>>>>>
>>>>> Kyle,
>>>>>
>>>>> Thanks a lot for the updated KIP. It looks good to me.
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Fri, Jun 2, 2017 at 5:37 AM, Jim Jagielski <j...@jagunet.com> 
>>>>> <j...@jagunet.com>
>>>>>
>>>>> wrote:
>>>>>
>>>>> This makes much more sense to me. +1
>>>>>
>>>>>
>>>>> On Jun 1, 2017, at 10:33 AM, Kyle Winkelman <
>>>>>
>>>>> winkelman.k...@gmail.com>
>>>>>
>>>>> wrote:
>>>>>
>>>>> I have updated the KIP and my PR. Let me know what you think.
>>>>> To created a cogrouped stream just call cogroup on a
>>>>>
>>>>> KgroupedStream
>>>>>
>>>>> and
>>>>>
>>>>> supply the initializer, aggValueSerde, and an aggregator. Then
>>>>>
>>>>> continue
>>>>>
>>>>> adding kgroupedstreams and aggregators. Then call one of the
>>>>>
>>>>> many
>>>>>
>>>>> aggregate
>>>>>
>>>>> calls to create a KTable.
>>>>>
>>>>> Thanks,
>>>>> Kyle
>>>>>
>>>>> On Jun 1, 2017 4:03 AM, "Damian Guy" <damian....@gmail.com> 
>>>>> <damian....@gmail.com>
>>>>>
>>>>> wrote:
>>>>>
>>>>> Hi Kyle,
>>>>>
>>>>> Thanks for the update. I think just one initializer makes sense
>>>>>
>>>>> as
>>>>>
>>>>> it
>>>>>
>>>>> should only be called once per key and generally it is just
>>>>>
>>>>> going
>>>>>
>>>>> to
>>>>>
>>>>> create
>>>>>
>>>>> a new instance of whatever the Aggregate class is.
>>>>>
>>>>> Cheers,
>>>>> Damian
>>>>>
>>>>> On Wed, 31 May 2017 at 20:09 Kyle Winkelman <
>>>>>
>>>>> winkelman.k...@gmail.com
>>>>>
>>>>> wrote:
>>>>>
>>>>>
>>>>> Hello all,
>>>>>
>>>>> I have spent some more time on this and the best alternative I
>>>>>
>>>>> have
>>>>>
>>>>> come
>>>>>
>>>>> up
>>>>>
>>>>> with is:
>>>>> KGroupedStream has a single cogroup call that takes an
>>>>>
>>>>> initializer
>>>>>
>>>>> and
>>>>>
>>>>> an
>>>>>
>>>>> aggregator.
>>>>> CogroupedKStream has a cogroup call that takes additional
>>>>>
>>>>> groupedStream
>>>>>
>>>>> aggregator pairs.
>>>>> CogroupedKStream has multiple aggregate methods that create
>>>>>
>>>>> the
>>>>>
>>>>> different
>>>>>
>>>>> stores.
>>>>>
>>>>> I plan on updating the kip but I want people's input on if we
>>>>>
>>>>> should
>>>>>
>>>>> have
>>>>>
>>>>> the initializer be passed in once at the beginning or if we
>>>>>
>>>>> should
>>>>>
>>>>> instead
>>>>>
>>>>> have the initializer be required for each call to one of the
>>>>>
>>>>> aggregate
>>>>>
>>>>> calls. The first makes more sense to me but doesnt allow the
>>>>>
>>>>> user
>>>>>
>>>>> to
>>>>>
>>>>> specify different initializers for different tables.
>>>>>
>>>>> Thanks,
>>>>> Kyle
>>>>>
>>>>> On May 24, 2017 7:46 PM, "Kyle Winkelman" <
>>>>>
>>>>> winkelman.k...@gmail.com>
>>>>>
>>>>> wrote:
>>>>>
>>>>>
>>>>> Yea I really like that idea I'll see what I can do to update
>>>>>
>>>>> the
>>>>>
>>>>> kip
>>>>>
>>>>> and
>>>>>
>>>>> my pr when I have some time. I'm not sure how well creating
>>>>>
>>>>> the
>>>>>
>>>>> kstreamaggregates will go though because at that point I will
>>>>>
>>>>> have
>>>>>
>>>>> thrown
>>>>>
>>>>> away the type of the values. It will be type safe I just may
>>>>>
>>>>> need to
>>>>>
>>>>> do a
>>>>>
>>>>> little forcing.
>>>>>
>>>>> Thanks,
>>>>> Kyle
>>>>>
>>>>> On May 24, 2017 3:28 PM, "Guozhang Wang" <wangg...@gmail.com
>>>>>
>>>>> wrote:
>>>>>
>>>>> Kyle,
>>>>>
>>>>> Thanks for the explanations, my previous read on the wiki
>>>>>
>>>>> examples
>>>>>
>>>>> was
>>>>>
>>>>> wrong.
>>>>>
>>>>> So I guess my motivation should be "reduced" to: can we move
>>>>>
>>>>> the
>>>>>
>>>>> window
>>>>>
>>>>> specs param from "KGroupedStream#cogroup(..)" to
>>>>> "CogroupedKStream#aggregate(..)", and my motivations are:
>>>>>
>>>>> 1. minor: we can reduce the #.generics in CogroupedKStream
>>>>>
>>>>> from
>>>>>
>>>>> 3
>>>>>
>>>>> to
>>>>>
>>>>> 2.
>>>>>
>>>>> 2. major: this is for extensibility of the APIs, and since
>>>>>
>>>>> we
>>>>>
>>>>> are
>>>>>
>>>>> removing
>>>>>
>>>>> the "Evolving" annotations on Streams it may be harder to
>>>>>
>>>>> change it
>>>>>
>>>>> again
>>>>>
>>>>> in the future. The extended use cases are that people wanted
>>>>>
>>>>> to
>>>>>
>>>>> have
>>>>>
>>>>> windowed running aggregates on different granularities, e.g.
>>>>>
>>>>> "give
>>>>>
>>>>> me
>>>>>
>>>>> the
>>>>>
>>>>> counts per-minute, per-hour, per-day and per-week", and
>>>>>
>>>>> today
>>>>>
>>>>> in
>>>>>
>>>>> DSL
>>>>>
>>>>> we
>>>>>
>>>>> need to specify that case in multiple aggregate operators,
>>>>>
>>>>> which
>>>>>
>>>>> gets
>>>>>
>>>>> a
>>>>>
>>>>> state store / changelog, etc. And it is possible to optimize
>>>>>
>>>>> it
>>>>>
>>>>> as
>>>>>
>>>>> well
>>>>>
>>>>> to
>>>>>
>>>>> a single state store. Its implementation would be tricky as
>>>>>
>>>>> you
>>>>>
>>>>> need
>>>>>
>>>>> to
>>>>>
>>>>> contain different lengthed windows within your window store
>>>>>
>>>>> but
>>>>>
>>>>> just
>>>>>
>>>>> from
>>>>>
>>>>> the public API point of view, it could be specified as:
>>>>>
>>>>> CogroupedKStream stream = stream1.cogroup(stream2, ...
>>>>> "state-store-name");
>>>>>
>>>>> table1 = stream.aggregate(/*per-minute window*/)
>>>>> table2 = stream.aggregate(/*per-hour window*/)
>>>>> table3 = stream.aggregate(/*per-day window*/)
>>>>>
>>>>> while underlying we are only using a single store
>>>>>
>>>>> "state-store-name"
>>>>>
>>>>> for
>>>>>
>>>>> it.
>>>>>
>>>>>
>>>>> Although this feature is out of the scope of this KIP, I'd
>>>>>
>>>>> like
>>>>>
>>>>> to
>>>>>
>>>>> discuss
>>>>>
>>>>> if we can "leave the door open" to make such changes without
>>>>>
>>>>> modifying
>>>>>
>>>>> the
>>>>>
>>>>> public APIs .
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Wed, May 24, 2017 at 3:57 AM, Kyle Winkelman <
>>>>>
>>>>> winkelman.k...@gmail.com
>>>>>
>>>>> wrote:
>>>>>
>>>>>
>>>>> I allow defining a single window/sessionwindow one time
>>>>>
>>>>> when
>>>>>
>>>>> you
>>>>>
>>>>> make
>>>>>
>>>>> the
>>>>>
>>>>> cogroup call from a KGroupedStream. From then on you are
>>>>>
>>>>> using
>>>>>
>>>>> the
>>>>>
>>>>> cogroup
>>>>>
>>>>> call from with in CogroupedKStream which doesnt accept any
>>>>>
>>>>> additional
>>>>>
>>>>> windows/sessionwindows.
>>>>>
>>>>> Is this what you meant by your question or did I
>>>>>
>>>>> misunderstand?
>>>>>
>>>>> On May 23, 2017 9:33 PM, "Guozhang Wang" <
>>>>>
>>>>> wangg...@gmail.com
>>>>>
>>>>> wrote:
>>>>>
>>>>> Another question that came to me is on "window alignment":
>>>>>
>>>>> from
>>>>>
>>>>> the
>>>>>
>>>>> KIP
>>>>>
>>>>> it
>>>>>
>>>>> seems you are allowing users to specify a (potentially
>>>>>
>>>>> different)
>>>>>
>>>>> window
>>>>>
>>>>> spec in each co-grouped input stream. So if these window
>>>>>
>>>>> specs
>>>>>
>>>>> are
>>>>>
>>>>> different how should we "align" them with different input
>>>>>
>>>>> streams? I
>>>>>
>>>>> think
>>>>>
>>>>> it is more natural to only specify on window spec in the
>>>>>
>>>>> KTable<RK, V> CogroupedKStream#aggregate(Windows);
>>>>>
>>>>>
>>>>> And remove it from the cogroup() functions. WDYT?
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>> On Tue, May 23, 2017 at 6:22 PM, Guozhang Wang <
>>>>>
>>>>> wangg...@gmail.com>
>>>>>
>>>>> wrote:
>>>>>
>>>>> Thanks for the proposal Kyle, this is a quite common use
>>>>>
>>>>> case
>>>>>
>>>>> to
>>>>>
>>>>> support
>>>>>
>>>>> such multi-way table join (i.e. N source tables with N
>>>>>
>>>>> aggregate
>>>>>
>>>>> func)
>>>>>
>>>>> with
>>>>>
>>>>> a single store and N+1 serdes, I have seen lots of people
>>>>>
>>>>> using
>>>>>
>>>>> the
>>>>>
>>>>> low-level PAPI to achieve this goal.
>>>>>
>>>>>
>>>>> On Fri, May 19, 2017 at 10:04 AM, Kyle Winkelman <
>>>>>
>>>>> winkelman.k...@gmail.com
>>>>>
>>>>> wrote:
>>>>>
>>>>> I like your point about not handling other cases such as
>>>>>
>>>>> count
>>>>>
>>>>> and
>>>>>
>>>>> reduce.
>>>>>
>>>>> I think that reduce may not make sense because reduce
>>>>>
>>>>> assumes
>>>>>
>>>>> that
>>>>>
>>>>> the
>>>>>
>>>>> input values are the same as the output values. With
>>>>>
>>>>> cogroup
>>>>>
>>>>> ...
> 
> -- 
> Signature
> <http://www.openbet.com/>     Michal Borowiecki
> Senior Software Engineer L4
>       T:      +44 208 742 1600
> 
>       
>       +44 203 249 8448
> 
>       
>        
>       E:      michal.borowie...@openbet.com
>       W:      www.openbet.com <http://www.openbet.com/>
> 
>       
>       OpenBet Ltd
> 
>       Chiswick Park Building 9
> 
>       566 Chiswick High Rd
> 
>       London
> 
>       W4 5XT
> 
>       UK
> 
>       
> <https://www.openbet.com/email_promo>
> 
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
> from your system as well as any copies. The content of e-mails as well
> as traffic data may be monitored by OpenBet for employment and security
> purposes. To protect the environment please do not print this e-mail
> unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
> 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
> registered in England and Wales. Registered no. 3134634. VAT no.
> GB927523612
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to