Hi,

Thanks for your comments. I think we cannot extend the two interfaces if we
want to keep lambdas. I updated the KIP [1]. Maybe I should change the
title, because now we are not limiting the KIP to only ValueMapper,
ValueTransformer and ValueJoiner.
Please feel free to comment.

[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner


Cheers,
Jeyhun

On Tue, May 9, 2017 at 7:36 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> If `ValueMapperWithKey` extends `ValueMapper` we don't need the new
> overlaod.
>
> And yes, we need to do one check -- but this happens when building the
> topology. At runtime (I mean after KafkaStream#start() we don't need any
> check as we will always use `ValueMapperWithKey`)
>
>
> -Matthias
>
>
> On 5/9/17 2:55 AM, Jeyhun Karimov wrote:
> > Hi,
> >
> > Thanks for feedback.
> > Then we need to overload method
> >   <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR>
> > mapper);
> > with
> >   <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super V, ? extends
> VR>
> > mapper);
> >
> > and in runtime (inside processor) we still have to check it is
> ValueMapper
> > or ValueMapperWithKey before wrapping it into the rich function.
> >
> >
> > Please correct me if I am wrong.
> >
> > Cheers,
> > Jeyhun
> >
> >
> >
> >
> > On Tue, May 9, 2017 at 10:56 AM Michal Borowiecki <
> > michal.borowie...@openbet.com> wrote:
> >
> >> +1 :)
> >>
> >>
> >> On 08/05/17 23:52, Matthias J. Sax wrote:
> >>> Hi,
> >>>
> >>> I was reading the updated KIP and I am wondering, if we should do the
> >>> design a little different.
> >>>
> >>> Instead of distinguishing between a RichFunction and non-RichFunction
> at
> >>> runtime level, we would use RichFunctions all the time. Thus, on the
> DSL
> >>> entry level, if a user provides a non-RichFunction, we wrap it by a
> >>> RichFunction that is fully implemented by Streams. This RichFunction
> >>> would just forward the call omitting the key:
> >>>
> >>> Just to sketch the idea (incomplete code snippet):
> >>>
> >>>> public StreamsRichValueMapper implements RichValueMapper() {
> >>>>    private ValueMapper userProvidedMapper; // set by constructor
> >>>>
> >>>>    public VR apply(final K key, final V1 value1, final V2 value2) {
> >>>>        return userProvidedMapper(value1, value2);
> >>>>    }
> >>>> }
> >>>
> >>>  From a performance point of view, I am not sure if the
> >>> "if(isRichFunction)" including casts etc would have more overhead than
> >>> this approach (we would do more nested method call for non-RichFunction
> >>> which should be more common than RichFunctions).
> >>>
> >>> This approach should not effect lambdas (or do I miss something?) and
> >>> might be cleaner, as we could have one more top level interface
> >>> `RichFunction` with methods `init()` and `close()` and also interfaces
> >>> for `RichValueMapper` etc. (thus, no abstract classes required).
> >>>
> >>>
> >>> Any thoughts?
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 5/6/17 5:29 PM, Jeyhun Karimov wrote:
> >>>> Hi,
> >>>>
> >>>> Thanks for comments. I extended PR and KIP to include rich functions.
> I
> >>>> will still have to evaluate the cost of deep copying of keys.
> >>>>
> >>>> Cheers,
> >>>> Jeyhun
> >>>>
> >>>> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak <
> >> mathieu.fenn...@replicon.com>
> >>>> wrote:
> >>>>
> >>>>> Hey Matthias,
> >>>>>
> >>>>> My opinion would be that documenting the immutability of the key is
> the
> >>>>> best approach available.  Better than requiring the key to be
> >> serializable
> >>>>> (as with Jeyhun's last pass at the PR), no performance risk.
> >>>>>
> >>>>> It'd be different if Java had immutable type constraints of some
> kind.
> >> :-)
> >>>>>
> >>>>> Mathieu
> >>>>>
> >>>>>
> >>>>> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax <
> >> matth...@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> Agreed about RichFunction. If we follow this path, it should cover
> >>>>>> all(?) DSL interfaces.
> >>>>>>
> >>>>>> About guarding the key -- I am still not sure what to do about it...
> >>>>>> Maybe it might be enough to document it (and name the key parameter
> >> like
> >>>>>> `readOnlyKey` to make is very clear). Ultimately, I would prefer to
> >>>>>> guard against any modification, but I have no good idea how to do
> >> this.
> >>>>>> Not sure what others think about the risk of corrupted partitioning
> >>>>>> (what would be a user error and we could say, well, bad luck, you
> got
> >> a
> >>>>>> bug in your code, that's not our fault), vs deep copy with a
> potential
> >>>>>> performance hit (that we can't quantity atm without any performance
> >>>>> test).
> >>>>>> We do have a performance system test. Maybe it's worth for you to
> >> apply
> >>>>>> the deep copy strategy and run the test. It's very basic performance
> >>>>>> test only, but might give some insight. If you want to do this, look
> >>>>>> into folder "tests" for general test setup, and into
> >>>>>> "tests/kafaktests/benchmarks/streams" to find find the perf test.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 5/5/17 8:55 AM, Jeyhun Karimov wrote:
> >>>>>>> Hi Matthias,
> >>>>>>>
> >>>>>>> I think extending KIP to include RichFunctions totally  makes
> sense.
> >>>>> So,
> >>>>>>>   we don't want to guard the keys because it is costly.
> >>>>>>> If we introduce RichFunctions I think it should not be limited only
> >>>>> the 3
> >>>>>>> interfaces proposed in KIP but to wide range of interfaces.
> >>>>>>> Please correct me if I am wrong.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Jeyhun
> >>>>>>>
> >>>>>>> On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax <
> >> matth...@confluent.io
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> One follow up. There was this email on the user list:
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj=
> >>>>>> Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+
> >>>>>>>> It might make sense so include Initializer, Adder, and Substractor
> >>>>>>>> inferface, too.
> >>>>>>>>
> >>>>>>>> And we should double check if there are other interface we might
> >> miss
> >>>>>> atm.
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 5/4/17 1:31 PM, Matthias J. Sax wrote:
> >>>>>>>>> Thanks for updating the KIP.
> >>>>>>>>>
> >>>>>>>>> Deep copying the key will work for sure, but I am actually a
> little
> >>>>> bit
> >>>>>>>>> worried about performance impact... We might want to do some test
> >> to
> >>>>>>>>> quantify this impact.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Btw: this remind me about the idea of `RichFunction` interface
> that
> >>>>>>>>> would allow users to access record metadata (like timestamp,
> >> offset,
> >>>>>>>>> partition etc) within DSL. This would be a similar concept.
> Thus, I
> >>>>> am
> >>>>>>>>> wondering, if it would make sense to enlarge the scope of this
> KIP
> >> by
> >>>>>>>>> that? WDYT?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Matthias
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 5/3/17 2:08 AM, Jeyhun Karimov wrote:
> >>>>>>>>>> Hi Mathieu,
> >>>>>>>>>>
> >>>>>>>>>> Thanks for feedback. I followed similar approach and updated PR
> >> and
> >>>>>> KIP
> >>>>>>>>>> accordingly. I tried to guard the key in Processors sending a
> copy
> >>>>> of
> >>>>>> an
> >>>>>>>>>> actual key.
> >>>>>>>>>> Because I am doing deep copy of an object, I think memory can be
> >>>>>>>> bottleneck
> >>>>>>>>>> in some use-cases.
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Jeyhun
> >>>>>>>>>>
> >>>>>>>>>> On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak <
> >>>>>>>> mathieu.fenn...@replicon.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Jeyhun,
> >>>>>>>>>>>
> >>>>>>>>>>> This approach would change ValueMapper (...etc) to be classes,
> >>>>> rather
> >>>>>>>> than
> >>>>>>>>>>> interfaces, which is also a backwards incompatible change.  An
> >>>>>>>> alternative
> >>>>>>>>>>> approach that would be backwards compatible would be to define
> >> new
> >>>>>>>>>>> interfaces, and provide overrides where those interfaces are
> >> used.
> >>>>>>>>>>>
> >>>>>>>>>>> Unfortunately, making the key parameter as "final" doesn't
> change
> >>>>>> much
> >>>>>>>>>>> about guarding against key change.  It only prevents the
> >> parameter
> >>>>>>>> variable
> >>>>>>>>>>> from being reassigned.  If the key type is a mutable object
> (eg.
> >>>>>>>> byte[]),
> >>>>>>>>>>> it can still be mutated. (eg. key[0] = 0).  But I'm not really
> >> sure
> >>>>>>>> there's
> >>>>>>>>>>> much that can be done about that.
> >>>>>>>>>>>
> >>>>>>>>>>> Mathieu
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, May 1, 2017 at 5:39 PM, Jeyhun Karimov <
> >>>>> je.kari...@gmail.com
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Thanks for comments.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The concerns makes sense. Although we can guard for immutable
> >> keys
> >>>>>> in
> >>>>>>>>>>>> current implementation (with few changes), I didn't consider
> >>>>>> backward
> >>>>>>>>>>>> compatibility.
> >>>>>>>>>>>>
> >>>>>>>>>>>> In this case 2 solutions come to my mind. In both cases, user
> >>>>>> accesses
> >>>>>>>>>>> the
> >>>>>>>>>>>> key in Object type, as passing extra type parameter will break
> >>>>>>>>>>>> backwards-compatibility.  So user has to cast to actual key
> >> type.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1. Firstly, We can overload apply method with 2 argument (key
> >> and
> >>>>>>>> value)
> >>>>>>>>>>>> and force key to be *final*. By doing this,  I think we can
> >>>>> address
> >>>>>>>> both
> >>>>>>>>>>>> backward-compatibility and guarding against key change.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 2. Secondly, we can create class KeyAccess like:
> >>>>>>>>>>>>
> >>>>>>>>>>>> public class KeyAccess {
> >>>>>>>>>>>>      Object key;
> >>>>>>>>>>>>      public void beforeApply(final Object key) {
> >>>>>>>>>>>>          this.key = key;
> >>>>>>>>>>>>      }
> >>>>>>>>>>>>      public Object getKey() {
> >>>>>>>>>>>>          return key;
> >>>>>>>>>>>>      }
> >>>>>>>>>>>> }
> >>>>>>>>>>>>
> >>>>>>>>>>>> We can extend *ValueMapper, ValueJoiner* and
> *ValueTransformer*
> >>>>> from
> >>>>>>>>>>>> *KeyAccess*. Inside processor (for example
> >>>>>> *KTableMapValuesProcessor*)
> >>>>>>>>>>>> before calling *mapper.apply(value)* we can set the *key* by
> >>>>>>>>>>>> *mapper.beforeApply(key)*. As a result, user can use
> *getKey()*
> >> to
> >>>>>>>> access
> >>>>>>>>>>>> the key inside *apply(value)* method.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers,
> >>>>>>>>>>>> Jeyhun
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, May 1, 2017 at 7:24 PM Matthias J. Sax <
> >>>>>> matth...@confluent.io
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Jeyhun,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> thanks a lot for the KIP!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I think there are two issues we need to address:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> (1) The KIP does not consider backward compatibility. Users
> did
> >>>>>>>>>>> complain
> >>>>>>>>>>>>> about this in past releases already, and as the user base
> >> grows,
> >>>>> we
> >>>>>>>>>>>>> should not break backward compatibility in future releases
> >>>>> anymore.
> >>>>>>>>>>>>> Thus, we should think of a better way to allow key access.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Mathieu's comment goes into the same direction
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On the other hand, the number of compile failures that
> would
> >>>>> need
> >>>>>>>> to
> >>>>>>>>>>>> be
> >>>>>>>>>>>>>>> fixed from this change is unfortunate. :-)
> >>>>>>>>>>>>> (2) Another concern is, that there is no guard to prevent
> user
> >>>>> code
> >>>>>>>> to
> >>>>>>>>>>>>> modify the key. This might corrupt partitioning if users do
> >> alter
> >>>>>> the
> >>>>>>>>>>>>> key (accidentally -- or users are just not aware that they
> are
> >>>>> not
> >>>>>>>>>>>>> allowed to modify the provided key object) and thus break the
> >>>>>>>>>>>>> application. (This was the original motivation to not provide
> >> the
> >>>>>> key
> >>>>>>>>>>> in
> >>>>>>>>>>>>> the first place -- it's guards against modification.)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 5/1/17 6:31 AM, Mathieu Fenniak wrote:
> >>>>>>>>>>>>>> Hi Jeyhun,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I just want to add my voice that, I too, have wished for
> >> access
> >>>>> to
> >>>>>>>>>>> the
> >>>>>>>>>>>>>> record key during a mapValues or similar operation.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On the other hand, the number of compile failures that would
> >>>>> need
> >>>>>> to
> >>>>>>>>>>> be
> >>>>>>>>>>>>>> fixed from this change is unfortunate. :-)  But at least it
> >>>>> would
> >>>>>>>> all
> >>>>>>>>>>>> be
> >>>>>>>>>>>>> a
> >>>>>>>>>>>>>> pretty clear and easy change.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Mathieu
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Mon, May 1, 2017 at 6:55 AM, Jeyhun Karimov <
> >>>>>>>> je.kari...@gmail.com
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>> Dear community,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I want to share KIP-149 [1] based on issues KAFKA-4218 [2],
> >>>>>>>>>>> KAFKA-4726
> >>>>>>>>>>>>> [3],
> >>>>>>>>>>>>>>> KAFKA-3745 [4]. The related PR can be found at [5].
> >>>>>>>>>>>>>>> I would like to get your comments.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>>>>>>>> 149%3A+Enabling+key+access+in+ValueTransformer%2C+
> >>>>>>>>>>>>>>> ValueMapper%2C+and+ValueJoiner
> >>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-4218
> >>>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/KAFKA-4726
> >>>>>>>>>>>>>>> [4] https://issues.apache.org/jira/browse/KAFKA-3745
> >>>>>>>>>>>>>>> [5] https://github.com/apache/kafka/pull/2946
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>> Jeyhun
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>> -Cheers
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Jeyhun
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>> --
> >>>>>>>>>>>> -Cheers
> >>>>>>>>>>>>
> >>>>>>>>>>>> Jeyhun
> >>>>>>>>>>>>
> >>>>>>>> --
> >>>>>>> -Cheers
> >>>>>>>
> >>>>>>> Jeyhun
> >>>>>>>
> >>>>>>
> >>
> >> --
> > -Cheers
> >
> > Jeyhun
> >
>
> --
-Cheers

Jeyhun

Reply via email to