If `ValueMapperWithKey` extends `ValueMapper` we don't need the new overlaod.
And yes, we need to do one check -- but this happens when building the topology. At runtime (I mean after KafkaStream#start() we don't need any check as we will always use `ValueMapperWithKey`) -Matthias On 5/9/17 2:55 AM, Jeyhun Karimov wrote: > Hi, > > Thanks for feedback. > Then we need to overload method > <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> > mapper); > with > <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super V, ? extends VR> > mapper); > > and in runtime (inside processor) we still have to check it is ValueMapper > or ValueMapperWithKey before wrapping it into the rich function. > > > Please correct me if I am wrong. > > Cheers, > Jeyhun > > > > > On Tue, May 9, 2017 at 10:56 AM Michal Borowiecki < > michal.borowie...@openbet.com> wrote: > >> +1 :) >> >> >> On 08/05/17 23:52, Matthias J. Sax wrote: >>> Hi, >>> >>> I was reading the updated KIP and I am wondering, if we should do the >>> design a little different. >>> >>> Instead of distinguishing between a RichFunction and non-RichFunction at >>> runtime level, we would use RichFunctions all the time. Thus, on the DSL >>> entry level, if a user provides a non-RichFunction, we wrap it by a >>> RichFunction that is fully implemented by Streams. This RichFunction >>> would just forward the call omitting the key: >>> >>> Just to sketch the idea (incomplete code snippet): >>> >>>> public StreamsRichValueMapper implements RichValueMapper() { >>>> private ValueMapper userProvidedMapper; // set by constructor >>>> >>>> public VR apply(final K key, final V1 value1, final V2 value2) { >>>> return userProvidedMapper(value1, value2); >>>> } >>>> } >>> >>> From a performance point of view, I am not sure if the >>> "if(isRichFunction)" including casts etc would have more overhead than >>> this approach (we would do more nested method call for non-RichFunction >>> which should be more common than RichFunctions). >>> >>> This approach should not effect lambdas (or do I miss something?) and >>> might be cleaner, as we could have one more top level interface >>> `RichFunction` with methods `init()` and `close()` and also interfaces >>> for `RichValueMapper` etc. (thus, no abstract classes required). >>> >>> >>> Any thoughts? >>> >>> >>> -Matthias >>> >>> >>> On 5/6/17 5:29 PM, Jeyhun Karimov wrote: >>>> Hi, >>>> >>>> Thanks for comments. I extended PR and KIP to include rich functions. I >>>> will still have to evaluate the cost of deep copying of keys. >>>> >>>> Cheers, >>>> Jeyhun >>>> >>>> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak < >> mathieu.fenn...@replicon.com> >>>> wrote: >>>> >>>>> Hey Matthias, >>>>> >>>>> My opinion would be that documenting the immutability of the key is the >>>>> best approach available. Better than requiring the key to be >> serializable >>>>> (as with Jeyhun's last pass at the PR), no performance risk. >>>>> >>>>> It'd be different if Java had immutable type constraints of some kind. >> :-) >>>>> >>>>> Mathieu >>>>> >>>>> >>>>> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax < >> matth...@confluent.io> >>>>> wrote: >>>>> >>>>>> Agreed about RichFunction. If we follow this path, it should cover >>>>>> all(?) DSL interfaces. >>>>>> >>>>>> About guarding the key -- I am still not sure what to do about it... >>>>>> Maybe it might be enough to document it (and name the key parameter >> like >>>>>> `readOnlyKey` to make is very clear). Ultimately, I would prefer to >>>>>> guard against any modification, but I have no good idea how to do >> this. >>>>>> Not sure what others think about the risk of corrupted partitioning >>>>>> (what would be a user error and we could say, well, bad luck, you got >> a >>>>>> bug in your code, that's not our fault), vs deep copy with a potential >>>>>> performance hit (that we can't quantity atm without any performance >>>>> test). >>>>>> We do have a performance system test. Maybe it's worth for you to >> apply >>>>>> the deep copy strategy and run the test. It's very basic performance >>>>>> test only, but might give some insight. If you want to do this, look >>>>>> into folder "tests" for general test setup, and into >>>>>> "tests/kafaktests/benchmarks/streams" to find find the perf test. >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 5/5/17 8:55 AM, Jeyhun Karimov wrote: >>>>>>> Hi Matthias, >>>>>>> >>>>>>> I think extending KIP to include RichFunctions totally makes sense. >>>>> So, >>>>>>> we don't want to guard the keys because it is costly. >>>>>>> If we introduce RichFunctions I think it should not be limited only >>>>> the 3 >>>>>>> interfaces proposed in KIP but to wide range of interfaces. >>>>>>> Please correct me if I am wrong. >>>>>>> >>>>>>> Cheers, >>>>>>> Jeyhun >>>>>>> >>>>>>> On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax < >> matth...@confluent.io >>>>>>> wrote: >>>>>>> >>>>>>>> One follow up. There was this email on the user list: >>>>>>>> >>>>>>>> >>>>>>>> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj= >>>>>> Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+ >>>>>>>> It might make sense so include Initializer, Adder, and Substractor >>>>>>>> inferface, too. >>>>>>>> >>>>>>>> And we should double check if there are other interface we might >> miss >>>>>> atm. >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> On 5/4/17 1:31 PM, Matthias J. Sax wrote: >>>>>>>>> Thanks for updating the KIP. >>>>>>>>> >>>>>>>>> Deep copying the key will work for sure, but I am actually a little >>>>> bit >>>>>>>>> worried about performance impact... We might want to do some test >> to >>>>>>>>> quantify this impact. >>>>>>>>> >>>>>>>>> >>>>>>>>> Btw: this remind me about the idea of `RichFunction` interface that >>>>>>>>> would allow users to access record metadata (like timestamp, >> offset, >>>>>>>>> partition etc) within DSL. This would be a similar concept. Thus, I >>>>> am >>>>>>>>> wondering, if it would make sense to enlarge the scope of this KIP >> by >>>>>>>>> that? WDYT? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> >>>>>>>>> On 5/3/17 2:08 AM, Jeyhun Karimov wrote: >>>>>>>>>> Hi Mathieu, >>>>>>>>>> >>>>>>>>>> Thanks for feedback. I followed similar approach and updated PR >> and >>>>>> KIP >>>>>>>>>> accordingly. I tried to guard the key in Processors sending a copy >>>>> of >>>>>> an >>>>>>>>>> actual key. >>>>>>>>>> Because I am doing deep copy of an object, I think memory can be >>>>>>>> bottleneck >>>>>>>>>> in some use-cases. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Jeyhun >>>>>>>>>> >>>>>>>>>> On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak < >>>>>>>> mathieu.fenn...@replicon.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Jeyhun, >>>>>>>>>>> >>>>>>>>>>> This approach would change ValueMapper (...etc) to be classes, >>>>> rather >>>>>>>> than >>>>>>>>>>> interfaces, which is also a backwards incompatible change. An >>>>>>>> alternative >>>>>>>>>>> approach that would be backwards compatible would be to define >> new >>>>>>>>>>> interfaces, and provide overrides where those interfaces are >> used. >>>>>>>>>>> >>>>>>>>>>> Unfortunately, making the key parameter as "final" doesn't change >>>>>> much >>>>>>>>>>> about guarding against key change. It only prevents the >> parameter >>>>>>>> variable >>>>>>>>>>> from being reassigned. If the key type is a mutable object (eg. >>>>>>>> byte[]), >>>>>>>>>>> it can still be mutated. (eg. key[0] = 0). But I'm not really >> sure >>>>>>>> there's >>>>>>>>>>> much that can be done about that. >>>>>>>>>>> >>>>>>>>>>> Mathieu >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, May 1, 2017 at 5:39 PM, Jeyhun Karimov < >>>>> je.kari...@gmail.com >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thanks for comments. >>>>>>>>>>>> >>>>>>>>>>>> The concerns makes sense. Although we can guard for immutable >> keys >>>>>> in >>>>>>>>>>>> current implementation (with few changes), I didn't consider >>>>>> backward >>>>>>>>>>>> compatibility. >>>>>>>>>>>> >>>>>>>>>>>> In this case 2 solutions come to my mind. In both cases, user >>>>>> accesses >>>>>>>>>>> the >>>>>>>>>>>> key in Object type, as passing extra type parameter will break >>>>>>>>>>>> backwards-compatibility. So user has to cast to actual key >> type. >>>>>>>>>>>> >>>>>>>>>>>> 1. Firstly, We can overload apply method with 2 argument (key >> and >>>>>>>> value) >>>>>>>>>>>> and force key to be *final*. By doing this, I think we can >>>>> address >>>>>>>> both >>>>>>>>>>>> backward-compatibility and guarding against key change. >>>>>>>>>>>> >>>>>>>>>>>> 2. Secondly, we can create class KeyAccess like: >>>>>>>>>>>> >>>>>>>>>>>> public class KeyAccess { >>>>>>>>>>>> Object key; >>>>>>>>>>>> public void beforeApply(final Object key) { >>>>>>>>>>>> this.key = key; >>>>>>>>>>>> } >>>>>>>>>>>> public Object getKey() { >>>>>>>>>>>> return key; >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> We can extend *ValueMapper, ValueJoiner* and *ValueTransformer* >>>>> from >>>>>>>>>>>> *KeyAccess*. Inside processor (for example >>>>>> *KTableMapValuesProcessor*) >>>>>>>>>>>> before calling *mapper.apply(value)* we can set the *key* by >>>>>>>>>>>> *mapper.beforeApply(key)*. As a result, user can use *getKey()* >> to >>>>>>>> access >>>>>>>>>>>> the key inside *apply(value)* method. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Jeyhun >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, May 1, 2017 at 7:24 PM Matthias J. Sax < >>>>>> matth...@confluent.io >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Jeyhun, >>>>>>>>>>>>> >>>>>>>>>>>>> thanks a lot for the KIP! >>>>>>>>>>>>> >>>>>>>>>>>>> I think there are two issues we need to address: >>>>>>>>>>>>> >>>>>>>>>>>>> (1) The KIP does not consider backward compatibility. Users did >>>>>>>>>>> complain >>>>>>>>>>>>> about this in past releases already, and as the user base >> grows, >>>>> we >>>>>>>>>>>>> should not break backward compatibility in future releases >>>>> anymore. >>>>>>>>>>>>> Thus, we should think of a better way to allow key access. >>>>>>>>>>>>> >>>>>>>>>>>>> Mathieu's comment goes into the same direction >>>>>>>>>>>>> >>>>>>>>>>>>>>> On the other hand, the number of compile failures that would >>>>> need >>>>>>>> to >>>>>>>>>>>> be >>>>>>>>>>>>>>> fixed from this change is unfortunate. :-) >>>>>>>>>>>>> (2) Another concern is, that there is no guard to prevent user >>>>> code >>>>>>>> to >>>>>>>>>>>>> modify the key. This might corrupt partitioning if users do >> alter >>>>>> the >>>>>>>>>>>>> key (accidentally -- or users are just not aware that they are >>>>> not >>>>>>>>>>>>> allowed to modify the provided key object) and thus break the >>>>>>>>>>>>> application. (This was the original motivation to not provide >> the >>>>>> key >>>>>>>>>>> in >>>>>>>>>>>>> the first place -- it's guards against modification.) >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -Matthias >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On 5/1/17 6:31 AM, Mathieu Fenniak wrote: >>>>>>>>>>>>>> Hi Jeyhun, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I just want to add my voice that, I too, have wished for >> access >>>>> to >>>>>>>>>>> the >>>>>>>>>>>>>> record key during a mapValues or similar operation. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On the other hand, the number of compile failures that would >>>>> need >>>>>> to >>>>>>>>>>> be >>>>>>>>>>>>>> fixed from this change is unfortunate. :-) But at least it >>>>> would >>>>>>>> all >>>>>>>>>>>> be >>>>>>>>>>>>> a >>>>>>>>>>>>>> pretty clear and easy change. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Mathieu >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, May 1, 2017 at 6:55 AM, Jeyhun Karimov < >>>>>>>> je.kari...@gmail.com >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> Dear community, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I want to share KIP-149 [1] based on issues KAFKA-4218 [2], >>>>>>>>>>> KAFKA-4726 >>>>>>>>>>>>> [3], >>>>>>>>>>>>>>> KAFKA-3745 [4]. The related PR can be found at [5]. >>>>>>>>>>>>>>> I would like to get your comments. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>> 149%3A+Enabling+key+access+in+ValueTransformer%2C+ >>>>>>>>>>>>>>> ValueMapper%2C+and+ValueJoiner >>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-4218 >>>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/KAFKA-4726 >>>>>>>>>>>>>>> [4] https://issues.apache.org/jira/browse/KAFKA-3745 >>>>>>>>>>>>>>> [5] https://github.com/apache/kafka/pull/2946 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> -Cheers >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>> -Cheers >>>>>>>>>>>> >>>>>>>>>>>> Jeyhun >>>>>>>>>>>> >>>>>>>> -- >>>>>>> -Cheers >>>>>>> >>>>>>> Jeyhun >>>>>>> >>>>>> >> >> -- > -Cheers > > Jeyhun >
signature.asc
Description: OpenPGP digital signature