On Sun, Aug 8, 2021 at 3:56 PM Matthias J. Sax <mj...@apache.org> wrote:
> >>> Not sure. Also, do you have an opinion on Long vs Double ? > > Not sure what you mean by `Long vs Double` ? Can you elaborate? > > We were discussing the return value for "sum". Alex suggested using "Double" and you had "Long" in one of your responses I think. Do you have a preference or reason one way or other ? -mohan > -Matthias > > On 8/8/21 7:41 AM, Mohan Parthasarathy wrote: > > On Tue, Aug 3, 2021 at 6:56 PM Matthias J. Sax <mj...@apache.org> wrote: > > > >> I was playing with the code a little bit, but it seems not to be easy to > >> use generics to enforce that V is `Comparable`... > >> > >> We would need to introduce a new interface > >> > >> interface ComparableStream<K, V extends Comparable<V>> > >> extends KStream<K, V> > >> { > >> KTable<K, V> min(); > >> } > >> > >> But it also requires a nasty cast to actually use it: > >> > >> KStream<String, String> stream = > >> new StreamsBuilder().stream(""); > >> KTable<String, String> table = > >> ((ComparableStream<String, String>) stream).min(); > >> > >> If the value-type does not implement `Comparable` the cast would not > >> compile... Or would there be a simpler way to ensure that min() can only > >> be called _if_ V is `Comparable`? > >> > >> > >> So maybe passing in a `Comparator<V>` might be the right way to go; > >> might also be more flexible anyway. -- My original idea was just to > >> maybe avoid the `Comparator` argument, as it would make the API nicer > >> IMHO; fewer parameters is usually better... > >> > >> Yeah, I tried both and using Comparator seems more natural to me in this > > case. I will update the document with the discussion here. > > > > > > > >> > >> I am not sure why we would want to pass `Function<V, Comparable<?>> > >> func` into `min()`? > >> > >> Not sure. Also, do you have an opinion on Long vs Double ? > > > > -thanks > > Mohan > > > > > >> > >> > >> -Matthias > >> > >> > >> > >> On 6/21/21 11:23 AM, Mohan Parthasarathy wrote: > >>> Alex, > >>> > >>> > >>> On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil < > >> alexandre.bra...@gmail.com> > >>> wrote: > >>> > >>>> Mohan / Mathias, > >>>> > >>>>>> I think extending min/max to non-numeric types makes sense. > Wondering > >>>>>> why we should require a `Comparator` or if we should require that > the > >>>>>> types implement `Comparable` instead? > >>>>>> > >>>>> Good question. This is what it would look like: > >>>>> > >>>>> KTable<K, V> min_comparable() > >>>>> KTable<K, V> min_comparator(Comparator<V> comp) > >>>> > >>>> Not sure if I understood Mathias' proposal correctly, but I think that > >>>> instead of going with > >>>> your original proposal (<VR extends Number> KTable<K, VR> > >> min(Function<V, > >>>> VR> func...) > >>>> or mine (KTable<K, V> min(Comparator<V> comparator...), we could > >> simplify > >>>> it a > >>>> bit by using a function to extract a Comparable property from the > >> original > >>>> value: > >>>> > >>>> KTable<K, V> min(Function<V, Comparable<?>> func...) > >>>> > >>>> I will let Matthias clarify. I am not sure why it is simpler than the > >>> comparator one. Comparable is implemented by the type and not sure > >> exposing > >>> it this way makes it any better. > >>> > >>>> I also think, that min/max should not change the value type. Using > >>>>> `Long` for sum() make sense though, and also to require a `<V extends > >>>>> Number>`. > >>>> > >>>> Are there any reasons to limit the sum() to integers? Why not use a > >> Double > >>>> instead? > >>>> > >>>> Yeah, if the precision is important, then we should stick with Double. > >>> > >>> -mohan > >>> > >>> Best regards, > >>>> Alexandre > >>>> > >>>> On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy < > >> mposde...@gmail.com> > >>>> wrote: > >>>> > >>>>> Matthias, > >>>>> > >>>>> On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax > >>>> <mj...@mailbox.org.invalid > >>>>>> > >>>>> wrote: > >>>>> > >>>>>> Hi, > >>>>>> > >>>>>> I think extending min/max to non-numeric types makes sense. > Wondering > >>>>>> why we should require a `Comparator` or if we should require that > the > >>>>>> types implement `Comparable` instead? > >>>>>> > >>>>>> Good question. This is what it would look like: > >>>>> > >>>>> KTable<K, V> min_comparable() > >>>>> KTable<K, V> min_comparator(Comparator<V> comp) > >>>>> > >>>>> For min_comparable to work, you still need the bounds "V extends > >>>>> Comparable< > >>>>> V>". AFAICT, to avoid the "type parameter V hiding the type V" > warning, > >>>> it > >>>>> has to be at the interface level like this: > >>>>> > >>>>> KStream<K, V extends Comparable<V>> > >>>>> > >>>>> which is a little messy unless there is a different way to do the > same. > >>>> The > >>>>> comparator gives a simple way to define an anonymous function. > >>>>> > >>>>> What do you think ? > >>>>> > >>>>> > >>>>>> I also think, that min/max should not change the value type. Using > >>>>>> `Long` for sum() make sense though, and also to require a `<V > extends > >>>>>> Number>`. > >>>>>> > >>>>>> I guess these are the two possibilities: > >>>>> > >>>>> <E extends Number> Long sum(Function<V, E> func) > >>>>> Long sum(Function<V, Number> func) > >>>>> > >>>>> Both should work. "func" can return any subtypes of Number and I > don't > >>>> see > >>>>> any advantages with the first version. Can you clarify ? > >>>>> > >>>>> Thanks > >>>>> Mohan > >>>>> > >>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> On 6/8/21 5:00 PM, Mohan Parthasarathy wrote: > >>>>>>> Hi Alex, > >>>>>>> > >>>>>>> On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil < > >>>>>> alexandre.bra...@gmail.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> > >>>>>>>> My point here is that, when we're only interested in a max/min > >>>> numeric > >>>>>>>> value, it doesn't > >>>>>>>> matter when we have repeated values, since we'd be only forwarding > >>>> the > >>>>>>>> number downstream, > >>>>>>>> so I could disregard when the Comparator returns a zero value > >>>> (meaning > >>>>>>>> equals) and min/max > >>>>>>>> would still be semantically correct. But when we're forwarding the > >>>>>> original > >>>>>>>> object downstream > >>>>>>>> instead of its numeric property, it could mean different things > >>>>>>>> semantically depending on how > >>>>>>>> we handle the repeated values. > >>>>>>>> > >>>>>>>> As an example, if I were using max() on a stream of Biddings for > >>>>>> products > >>>>>>>> in an auction, the > >>>>>>>> order of the biddings would probably influence the winner if two > >>>>> clients > >>>>>>>> send Biddings with the > >>>>>>>> same value. If we're only forwarding the Bidding value downstream > (a > >>>>>> double > >>>>>>>> value of 100, for > >>>>>>>> example), it doesn't matter how repeated values are handled, since > >>>> the > >>>>>> max > >>>>>>>> price for this > >>>>>>>> auction would still be 100.00, no matter what Bidding got selected > >>>> in > >>>>>> the > >>>>>>>> end. But if we're > >>>>>>>> forwarding the Biddings downstream instead, then it matters > whether > >>>>> the > >>>>>>>> winning Bidding sent > >>>>>>>> downstream was originally posted by Client A or Client B. > >>>>>>>> > >>>>>>>> I'm not saying that an overloaded method to handle different > options > >>>>> for > >>>>>>>> how repeated values > >>>>>>>> should be handled by min/max is mandatory, but it should be clear > on > >>>>> the > >>>>>>>> methods' docs > >>>>>>>> what would happen when Comparator.compare() == 0. My preferred > >>>> option > >>>>>> for > >>>>>>>> the default > >>>>>>>> behaviour is to only forward a new value is smaller/bigger than > the > >>>>>>>> previous min/max value > >>>>>>>> (ignoring compare() == 0), since it would emit less values > >>>> downstream > >>>>>> and > >>>>>>>> would be easier > >>>>>>>> to read ("I only send a value downstream if it's bigger/smaller > than > >>>>> the > >>>>>>>> previously selected > >>>>>>>> value"). > >>>>>>>> > >>>>>>> Thanks for the clarification. I like your suggestion unless someone > >>>>> feels > >>>>>>> that they want an option to control this (i.e., when compare() == > 0, > >>>>>> return > >>>>>>> the old value vs new value). > >>>>>>> > >>>>>>> > >>>>>>>> > >>>>>>>>> Not knowing the schema of the value (V) has its own set of > >>>> problems. > >>>>>> As I > >>>>>>>> have alluded to > >>>>>>>>> in the proposal, this is a little bit messy. We already have > >>>> "reduce" > >>>>>>>> which can be used to > >>>>>>>>> implement sum (mapValues().reduce()). > >>>>>>>>> Thinking about it more, do you think "sum" would be useful ? One > >>>>> hacky > >>>>>>>> way to implement > >>>>>>>>> this is to inspect the type of the return when the "func" is > called > >>>>> the > >>>>>>>> first time OR infer from > >>>>>>>>> the materialized or have an explicit initializer. > >>>>>>>> > >>>>>>>> I think it might be useful for some use cases, yes, but it would > be > >>>>>> tricky > >>>>>>>> to implement this in a > >>>>>>>> way that handles generic Numbers and keeps their original > >>>>> implementation > >>>>>>>> class. One > >>>>>>>> simplification you could take is fixating VR to be a Double, and > >>>> then > >>>>>> use > >>>>>>>> Number.doubleValue() > >>>>>>>> to compute the sum. > >>>>>>>> > >>>>>>> > >>>>>>> Yeah, that would simplify quite a bit. I think you are suggesting > >>>> this: > >>>>>>> > >>>>>>> KTable<K,Double> sum(Function<V, Number> func) > >>>>>>> > >>>>>>> > >>>>>>>> What you said about using reduce() to compute a sum() is also true > >>>> for > >>>>>>>> min() and max(). =) All > >>>>>>>> three methods in this KIP would be a syntactic sugar for what > could > >>>>>>>> otherwise be implemented > >>>>>>>> using reduce/aggregate, but I see value in implementing them and > >>>>>>>> simplifying the adoption of > >>>>>>>> those use cases. > >>>>>>>> > >>>>>>>> Agreed. I seem to have forgotten the reason as to why I started > this > >>>>> KIP > >>>>>>> :-). There is a long way to go. > >>>>>>> > >>>>>>> -thanks > >>>>>>> Mohan > >>>>>>> > >>>>>>> Best regards, > >>>>>>>> Alexandre > >>>>>>>> > >>>>>>>> On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy < > >>>>>> mposde...@gmail.com> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Alex, > >>>>>>>>> > >>>>>>>>> Responses below. > >>>>>>>>> > >>>>>>>>> On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil < > >>>>>>>>> alexandre.bra...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Mohan, > >>>>>>>>>> > >>>>>>>>>> I like the idea of adding those methods to the API, but I'd like > >>>> to > >>>>>>>> make > >>>>>>>>> a > >>>>>>>>>> suggestion: > >>>>>>>>>> > >>>>>>>>>> Although the most used scenario for min() / max() might possibly > >>>> be > >>>>>> for > >>>>>>>>>> numeric values, I think they could also be > >>>>>>>>>> useful on other objects like Dates, LocalDates or Strings. Why > >>>> limit > >>>>>>>> the > >>>>>>>>>> API to Numbers only? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> There was no specific reason. Just addressing the common > scenario. > >>>>> But > >>>>>> I > >>>>>>>>> don't see why this can't be supported given your suggestion > below. > >>>>>>>>> > >>>>>>>>> Extending on the above, couldn't we change the API to provide a > >>>>>>>>>> Comparator<V> instead of the Function<V, VR> > >>>>>>>>>> for those methods, and make them return a KTable<K, V> instead? > >>>> Not > >>>>>>>> only > >>>>>>>>>> would this approach not limit the > >>>>>>>>>> usage of those methods to Numbers, but they'd also preserve the > >>>>> origin > >>>>>>>>> from > >>>>>>>>>> the min/max value [1]. The extraction of > >>>>>>>>>> a single (numeric?) value could be achieved by a subsequent > >>>>>>>> .mapValues() > >>>>>>>>>> operator, and this strategy would also > >>>>>>>>>> allow us to reuse the stream's current value serde on min / max, > >>>>>> making > >>>>>>>>> the > >>>>>>>>>> Materialized an optional parameter. > >>>>>>>>>> > >>>>>>>>>> I like your idea though it is odd that min/max returns > KTable<K, > >>>> V> > >>>>>>>>> instead of the KTable<K, VR> (like in count), but mapValues > should > >>>> do > >>>>>> the > >>>>>>>>> trick. > >>>>>>>>> > >>>>>>>>> One extra complication of this approach is that now we'd have to > >>>>> handle > >>>>>>>>>> repeated min/max values from different > >>>>>>>>>> origins (two semantically different objects for which the > >>>> comparator > >>>>>>>>>> returns 0), but we could solve that by adding > >>>>>>>>>> a parameter to specify whether to use the older or newer value > (or > >>>>>>>>> assuming > >>>>>>>>>> one of these options as default for a > >>>>>>>>>> simpler API?). > >>>>>>>>>> > >>>>>>>>>> I am not sure whether this complexity is warranted. Why can't we > >>>>> just > >>>>>>>>> stick to the way a regular Comparator works ? Can you give me a > >>>> real > >>>>>>>> world > >>>>>>>>> example ? > >>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> I know it's an implementation issue, but I'm curious on how > you'd > >>>>>> solve > >>>>>>>>>> handling the <VR extends Number> on > >>>>>>>>>> the sum(). Since the multiple implementations of this interface > >>>>> don't > >>>>>>>>> have > >>>>>>>>>> a common constructor nor an interface > >>>>>>>>>> method to add two Numbers, would it be possible to implement > sum() > >>>>> and > >>>>>>>>>> retain the original VR type on the > >>>>>>>>>> returned KTable? > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> Not knowing the schema of the value (V) has its own set of > >>>> problems. > >>>>>> As I > >>>>>>>>> have alluded to in the proposal, this is a little bit messy. We > >>>>> already > >>>>>>>>> have "reduce" which can be used to implement sum > >>>>>> (mapValues().reduce()). > >>>>>>>>> Thinking about it more, do you think "sum" would be useful ? One > >>>>> hacky > >>>>>>>> way > >>>>>>>>> to implement this is to inspect the type of the return when the > >>>>> "func" > >>>>>> is > >>>>>>>>> called the first time OR infer from the materialized or have an > >>>>>> explicit > >>>>>>>>> initializer. > >>>>>>>>> > >>>>>>>>> Thanks > >>>>>>>>> Mohan > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> [1]: An example scenario for this would be to find the min / max > >>>>>>>> Bidding > >>>>>>>>>> for a product where, at the end of the > >>>>>>>>>> auction, I need not only the min / max value of said Bidding, > but > >>>>> also > >>>>>>>>> the > >>>>>>>>>> bidder's contact information. > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Alexandre > >>>>>>>>>> > >>>>>>>>>> On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy < > >>>>>>>> mposde...@gmail.com> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> I have created a proposal for adding some additional > aggregation > >>>>> APIs > >>>>>>>>>> like > >>>>>>>>>>> count. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>> > >>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs > >>>>>>>>>>> > >>>>>>>>>>> I have noted down some of the issues that need discussion. > Thanks > >>>>> to > >>>>>>>>>>> Matthias for helping me with the scope of the proposal. > >>>>>>>>>>> > >>>>>>>>>>> Thanks > >>>>>>>>>>> Mohan > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >