On Sun, Aug 8, 2021 at 3:56 PM Matthias J. Sax <mj...@apache.org> wrote:

> >>> Not sure. Also, do you have an opinion on Long vs Double ?
>
> Not sure what you mean by `Long vs Double` ? Can you elaborate?
>
> We were discussing the return value for "sum". Alex suggested using
"Double" and you had "Long" in one of your responses I think. Do you have a
preference or reason one way or other ?

-mohan


> -Matthias
>
> On 8/8/21 7:41 AM, Mohan Parthasarathy wrote:
> > On Tue, Aug 3, 2021 at 6:56 PM Matthias J. Sax <mj...@apache.org> wrote:
> >
> >> I was playing with the code a little bit, but it seems not to be easy to
> >> use generics to enforce that V is `Comparable`...
> >>
> >> We would need to introduce a new interface
> >>
> >>  interface ComparableStream<K, V extends Comparable<V>>
> >>     extends KStream<K, V>
> >>  {
> >>     KTable<K, V> min();
> >>  }
> >>
> >> But it also requires a nasty cast to actually use it:
> >>
> >>   KStream<String, String> stream =
> >>     new StreamsBuilder().stream("");
> >>   KTable<String, String> table =
> >>     ((ComparableStream<String, String>) stream).min();
> >>
> >> If the value-type does not implement `Comparable` the cast would not
> >> compile... Or would there be a simpler way to ensure that min() can only
> >> be called _if_ V is `Comparable`?
> >>
> >>
> >> So maybe passing in a `Comparator<V>` might be the right way to go;
> >> might also be more flexible anyway. -- My original idea was just to
> >> maybe avoid the `Comparator` argument, as it would make the API nicer
> >> IMHO; fewer parameters is usually better...
> >>
> >> Yeah, I tried both and using Comparator seems more natural to me in this
> > case. I will update the document with the discussion here.
> >
> >
> >
> >>
> >> I am not sure why we would want to pass `Function<V, Comparable<?>>
> >> func` into `min()`?
> >>
> >> Not sure. Also, do you have an opinion on Long vs Double ?
> >
> > -thanks
> > Mohan
> >
> >
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 6/21/21 11:23 AM, Mohan Parthasarathy wrote:
> >>> Alex,
> >>>
> >>>
> >>> On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil <
> >> alexandre.bra...@gmail.com>
> >>> wrote:
> >>>
> >>>> Mohan / Mathias,
> >>>>
> >>>>>> I think extending min/max to non-numeric types makes sense.
> Wondering
> >>>>>> why we should require a `Comparator` or if we should require that
> the
> >>>>>> types implement `Comparable` instead?
> >>>>>>
> >>>>> Good question. This is what it would look like:
> >>>>>
> >>>>> KTable<K, V> min_comparable()
> >>>>> KTable<K, V> min_comparator(Comparator<V> comp)
> >>>>
> >>>> Not sure if I understood Mathias' proposal correctly, but I think that
> >>>> instead of going with
> >>>> your original proposal (<VR extends Number> KTable<K, VR>
> >> min(Function<V,
> >>>> VR> func...)
> >>>> or mine (KTable<K, V> min(Comparator<V> comparator...), we could
> >> simplify
> >>>> it a
> >>>> bit by using a function to extract a Comparable property from the
> >> original
> >>>> value:
> >>>>
> >>>> KTable<K, V> min(Function<V, Comparable<?>> func...)
> >>>>
> >>>> I will let Matthias clarify. I am not sure why it is simpler than the
> >>> comparator one. Comparable is implemented by the type and not sure
> >> exposing
> >>> it this way makes it any better.
> >>>
> >>>> I also think, that min/max should not change the value type. Using
> >>>>> `Long` for sum() make sense though, and also to require a `<V extends
> >>>>> Number>`.
> >>>>
> >>>> Are there any reasons to limit the sum() to integers? Why not use a
> >> Double
> >>>> instead?
> >>>>
> >>>> Yeah, if the precision is important, then we should stick with Double.
> >>>
> >>> -mohan
> >>>
> >>> Best regards,
> >>>> Alexandre
> >>>>
> >>>> On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy <
> >> mposde...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Matthias,
> >>>>>
> >>>>> On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax
> >>>> <mj...@mailbox.org.invalid
> >>>>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> I think extending min/max to non-numeric types makes sense.
> Wondering
> >>>>>> why we should require a `Comparator` or if we should require that
> the
> >>>>>> types implement `Comparable` instead?
> >>>>>>
> >>>>>> Good question. This is what it would look like:
> >>>>>
> >>>>> KTable<K, V> min_comparable()
> >>>>> KTable<K, V> min_comparator(Comparator<V> comp)
> >>>>>
> >>>>> For min_comparable to work, you still need the bounds "V extends
> >>>>> Comparable<
> >>>>> V>". AFAICT, to avoid the "type parameter V hiding the type V"
> warning,
> >>>> it
> >>>>> has to be at the interface level like this:
> >>>>>
> >>>>>  KStream<K,  V extends Comparable<V>>
> >>>>>
> >>>>> which is a little messy unless there is a different way to do the
> same.
> >>>> The
> >>>>> comparator gives a simple way to define an anonymous function.
> >>>>>
> >>>>> What do you think ?
> >>>>>
> >>>>>
> >>>>>> I also think, that min/max should not change the value type. Using
> >>>>>> `Long` for sum() make sense though, and also to require a `<V
> extends
> >>>>>> Number>`.
> >>>>>>
> >>>>>> I guess these are the two possibilities:
> >>>>>
> >>>>> <E extends Number> Long sum(Function<V, E> func)
> >>>>> Long sum(Function<V, Number> func)
> >>>>>
> >>>>> Both should work. "func" can return any subtypes of Number and I
> don't
> >>>> see
> >>>>> any advantages with the first version. Can you clarify ?
> >>>>>
> >>>>> Thanks
> >>>>> Mohan
> >>>>>
> >>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 6/8/21 5:00 PM, Mohan Parthasarathy wrote:
> >>>>>>> Hi Alex,
> >>>>>>>
> >>>>>>> On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil <
> >>>>>> alexandre.bra...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>>
> >>>>>>>> My point here is that, when we're only interested in a max/min
> >>>> numeric
> >>>>>>>> value, it doesn't
> >>>>>>>> matter when we have repeated values, since we'd be only forwarding
> >>>> the
> >>>>>>>> number downstream,
> >>>>>>>> so I could disregard when the Comparator returns a zero value
> >>>> (meaning
> >>>>>>>> equals) and min/max
> >>>>>>>> would still be semantically correct. But when we're forwarding the
> >>>>>> original
> >>>>>>>> object downstream
> >>>>>>>> instead of its numeric property, it could mean different things
> >>>>>>>> semantically depending on how
> >>>>>>>> we handle the repeated values.
> >>>>>>>>
> >>>>>>>> As an example, if I were using max() on a stream of Biddings for
> >>>>>> products
> >>>>>>>> in an auction, the
> >>>>>>>> order of the biddings would probably influence the winner if two
> >>>>> clients
> >>>>>>>> send Biddings with the
> >>>>>>>> same value. If we're only forwarding the Bidding value downstream
> (a
> >>>>>> double
> >>>>>>>> value of 100, for
> >>>>>>>> example), it doesn't matter how repeated values are handled, since
> >>>> the
> >>>>>> max
> >>>>>>>> price for this
> >>>>>>>> auction would still be 100.00, no matter what Bidding got selected
> >>>> in
> >>>>>> the
> >>>>>>>> end. But if we're
> >>>>>>>> forwarding the Biddings downstream instead, then it matters
> whether
> >>>>> the
> >>>>>>>> winning Bidding sent
> >>>>>>>> downstream was originally posted by Client A or Client B.
> >>>>>>>>
> >>>>>>>> I'm not saying that an overloaded method to handle different
> options
> >>>>> for
> >>>>>>>> how repeated values
> >>>>>>>> should be handled by min/max is mandatory, but it should be clear
> on
> >>>>> the
> >>>>>>>> methods' docs
> >>>>>>>> what would happen when Comparator.compare() == 0. My preferred
> >>>> option
> >>>>>> for
> >>>>>>>> the default
> >>>>>>>> behaviour is to only forward a new value is smaller/bigger than
> the
> >>>>>>>> previous min/max value
> >>>>>>>> (ignoring compare() == 0), since it would emit less values
> >>>> downstream
> >>>>>> and
> >>>>>>>> would be easier
> >>>>>>>> to read ("I only send a value downstream if it's bigger/smaller
> than
> >>>>> the
> >>>>>>>> previously selected
> >>>>>>>> value").
> >>>>>>>>
> >>>>>>> Thanks for the clarification. I like your suggestion unless someone
> >>>>> feels
> >>>>>>> that they want an option to control this (i.e., when compare() ==
> 0,
> >>>>>> return
> >>>>>>> the old value vs new value).
> >>>>>>>
> >>>>>>>
> >>>>>>>>
> >>>>>>>>> Not knowing the schema of the value (V) has its own set of
> >>>> problems.
> >>>>>> As I
> >>>>>>>> have alluded to
> >>>>>>>>> in the proposal, this is a little bit messy. We already have
> >>>> "reduce"
> >>>>>>>> which can be used to
> >>>>>>>>> implement sum (mapValues().reduce()).
> >>>>>>>>> Thinking about it more, do you think "sum" would be useful ? One
> >>>>> hacky
> >>>>>>>> way to implement
> >>>>>>>>> this is to inspect the type of the return when the "func" is
> called
> >>>>> the
> >>>>>>>> first time OR infer from
> >>>>>>>>> the materialized or have an explicit initializer.
> >>>>>>>>
> >>>>>>>> I think it might be useful for some use cases, yes, but it would
> be
> >>>>>> tricky
> >>>>>>>> to implement this in a
> >>>>>>>> way that handles generic Numbers and keeps their original
> >>>>> implementation
> >>>>>>>> class. One
> >>>>>>>> simplification you could take is fixating VR to be a Double, and
> >>>> then
> >>>>>> use
> >>>>>>>> Number.doubleValue()
> >>>>>>>> to compute the sum.
> >>>>>>>>
> >>>>>>>
> >>>>>>> Yeah, that would simplify quite a bit. I think you are suggesting
> >>>> this:
> >>>>>>>
> >>>>>>> KTable<K,Double> sum(Function<V, Number> func)
> >>>>>>>
> >>>>>>>
> >>>>>>>> What you said about using reduce() to compute a sum() is also true
> >>>> for
> >>>>>>>> min() and max(). =) All
> >>>>>>>> three methods in this KIP would be a syntactic sugar for what
> could
> >>>>>>>> otherwise be implemented
> >>>>>>>> using reduce/aggregate, but I see value in implementing them and
> >>>>>>>> simplifying the adoption of
> >>>>>>>> those use cases.
> >>>>>>>>
> >>>>>>>> Agreed. I seem to have forgotten the reason as to why I started
> this
> >>>>> KIP
> >>>>>>> :-). There is a long way to go.
> >>>>>>>
> >>>>>>> -thanks
> >>>>>>> Mohan
> >>>>>>>
> >>>>>>> Best regards,
> >>>>>>>> Alexandre
> >>>>>>>>
> >>>>>>>> On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy <
> >>>>>> mposde...@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Alex,
> >>>>>>>>>
> >>>>>>>>> Responses below.
> >>>>>>>>>
> >>>>>>>>> On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil <
> >>>>>>>>> alexandre.bra...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Mohan,
> >>>>>>>>>>
> >>>>>>>>>> I like the idea of adding those methods to the API, but I'd like
> >>>> to
> >>>>>>>> make
> >>>>>>>>> a
> >>>>>>>>>> suggestion:
> >>>>>>>>>>
> >>>>>>>>>> Although the most used scenario for min() / max() might possibly
> >>>> be
> >>>>>> for
> >>>>>>>>>> numeric values, I think they could also be
> >>>>>>>>>> useful on other objects like Dates, LocalDates or Strings. Why
> >>>> limit
> >>>>>>>> the
> >>>>>>>>>> API to Numbers only?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>> There was no specific reason. Just addressing the common
> scenario.
> >>>>> But
> >>>>>> I
> >>>>>>>>> don't see why this can't be supported given your suggestion
> below.
> >>>>>>>>>
> >>>>>>>>> Extending on the above, couldn't we change the API to provide a
> >>>>>>>>>> Comparator<V> instead of the Function<V, VR>
> >>>>>>>>>> for those methods, and make them return a KTable<K, V> instead?
> >>>> Not
> >>>>>>>> only
> >>>>>>>>>> would this approach not limit the
> >>>>>>>>>> usage of those methods to Numbers, but they'd also preserve the
> >>>>> origin
> >>>>>>>>> from
> >>>>>>>>>> the min/max value [1]. The extraction of
> >>>>>>>>>> a single (numeric?) value could be achieved by a subsequent
> >>>>>>>> .mapValues()
> >>>>>>>>>> operator, and this strategy would also
> >>>>>>>>>> allow us to reuse the stream's current value serde on min / max,
> >>>>>> making
> >>>>>>>>> the
> >>>>>>>>>> Materialized an optional parameter.
> >>>>>>>>>>
> >>>>>>>>>> I like your idea though it is odd that min/max returns
> KTable<K,
> >>>> V>
> >>>>>>>>> instead of the KTable<K, VR> (like in count), but mapValues
> should
> >>>> do
> >>>>>> the
> >>>>>>>>> trick.
> >>>>>>>>>
> >>>>>>>>> One extra complication of this approach is that now we'd have to
> >>>>> handle
> >>>>>>>>>> repeated min/max values from different
> >>>>>>>>>> origins (two semantically different objects for which the
> >>>> comparator
> >>>>>>>>>> returns 0), but we could solve that by adding
> >>>>>>>>>> a parameter to specify whether to use the older or newer value
> (or
> >>>>>>>>> assuming
> >>>>>>>>>> one of these options as default for a
> >>>>>>>>>> simpler API?).
> >>>>>>>>>>
> >>>>>>>>>> I am not sure whether this complexity is warranted. Why can't we
> >>>>> just
> >>>>>>>>> stick to the way a regular Comparator works ? Can you give me a
> >>>> real
> >>>>>>>> world
> >>>>>>>>> example ?
> >>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> I know it's an implementation issue, but I'm curious on how
> you'd
> >>>>>> solve
> >>>>>>>>>> handling the <VR extends Number> on
> >>>>>>>>>> the sum(). Since the multiple implementations of this interface
> >>>>> don't
> >>>>>>>>> have
> >>>>>>>>>> a common constructor nor an interface
> >>>>>>>>>> method to add two Numbers, would it be possible to implement
> sum()
> >>>>> and
> >>>>>>>>>> retain the original VR type on the
> >>>>>>>>>> returned KTable?
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Not knowing the schema of the value (V) has its own set of
> >>>> problems.
> >>>>>> As I
> >>>>>>>>> have alluded to in the proposal, this is a little bit messy. We
> >>>>> already
> >>>>>>>>> have "reduce" which can be used to implement sum
> >>>>>> (mapValues().reduce()).
> >>>>>>>>> Thinking about it more, do you think "sum" would be useful ? One
> >>>>> hacky
> >>>>>>>> way
> >>>>>>>>> to implement this is to inspect the type of the return when the
> >>>>> "func"
> >>>>>> is
> >>>>>>>>> called the first time OR infer from the materialized or have an
> >>>>>> explicit
> >>>>>>>>> initializer.
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>> Mohan
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>> [1]: An example scenario for this would be to find the min / max
> >>>>>>>> Bidding
> >>>>>>>>>> for a product where, at the end of the
> >>>>>>>>>> auction, I need not only the min / max value of said Bidding,
> but
> >>>>> also
> >>>>>>>>> the
> >>>>>>>>>> bidder's contact information.
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Alexandre
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy <
> >>>>>>>> mposde...@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>> I have created a proposal for adding some additional
> aggregation
> >>>>> APIs
> >>>>>>>>>> like
> >>>>>>>>>>> count.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs
> >>>>>>>>>>>
> >>>>>>>>>>> I have noted down some of the issues that need discussion.
> Thanks
> >>>>> to
> >>>>>>>>>>> Matthias for helping me with the scope of the proposal.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks
> >>>>>>>>>>> Mohan
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to