Hi, I was reading the updated KIP and I am wondering, if we should do the design a little different.
Instead of distinguishing between a RichFunction and non-RichFunction at runtime level, we would use RichFunctions all the time. Thus, on the DSL entry level, if a user provides a non-RichFunction, we wrap it by a RichFunction that is fully implemented by Streams. This RichFunction would just forward the call omitting the key: Just to sketch the idea (incomplete code snippet): > public StreamsRichValueMapper implements RichValueMapper() { > private ValueMapper userProvidedMapper; // set by constructor > > public VR apply(final K key, final V1 value1, final V2 value2) { > return userProvidedMapper(value1, value2); > } > } From a performance point of view, I am not sure if the "if(isRichFunction)" including casts etc would have more overhead than this approach (we would do more nested method call for non-RichFunction which should be more common than RichFunctions). This approach should not effect lambdas (or do I miss something?) and might be cleaner, as we could have one more top level interface `RichFunction` with methods `init()` and `close()` and also interfaces for `RichValueMapper` etc. (thus, no abstract classes required). Any thoughts? -Matthias On 5/6/17 5:29 PM, Jeyhun Karimov wrote: > Hi, > > Thanks for comments. I extended PR and KIP to include rich functions. I > will still have to evaluate the cost of deep copying of keys. > > Cheers, > Jeyhun > > On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak <mathieu.fenn...@replicon.com> > wrote: > >> Hey Matthias, >> >> My opinion would be that documenting the immutability of the key is the >> best approach available. Better than requiring the key to be serializable >> (as with Jeyhun's last pass at the PR), no performance risk. >> >> It'd be different if Java had immutable type constraints of some kind. :-) >> >> Mathieu >> >> >> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Agreed about RichFunction. If we follow this path, it should cover >>> all(?) DSL interfaces. >>> >>> About guarding the key -- I am still not sure what to do about it... >>> Maybe it might be enough to document it (and name the key parameter like >>> `readOnlyKey` to make is very clear). Ultimately, I would prefer to >>> guard against any modification, but I have no good idea how to do this. >>> Not sure what others think about the risk of corrupted partitioning >>> (what would be a user error and we could say, well, bad luck, you got a >>> bug in your code, that's not our fault), vs deep copy with a potential >>> performance hit (that we can't quantity atm without any performance >> test). >>> >>> We do have a performance system test. Maybe it's worth for you to apply >>> the deep copy strategy and run the test. It's very basic performance >>> test only, but might give some insight. If you want to do this, look >>> into folder "tests" for general test setup, and into >>> "tests/kafaktests/benchmarks/streams" to find find the perf test. >>> >>> >>> -Matthias >>> >>> On 5/5/17 8:55 AM, Jeyhun Karimov wrote: >>>> Hi Matthias, >>>> >>>> I think extending KIP to include RichFunctions totally makes sense. >> So, >>>> we don't want to guard the keys because it is costly. >>>> If we introduce RichFunctions I think it should not be limited only >> the 3 >>>> interfaces proposed in KIP but to wide range of interfaces. >>>> Please correct me if I am wrong. >>>> >>>> Cheers, >>>> Jeyhun >>>> >>>> On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax <matth...@confluent.io >>> >>>> wrote: >>>> >>>>> One follow up. There was this email on the user list: >>>>> >>>>> >>>>> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj= >>> Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+ >>>>> >>>>> It might make sense so include Initializer, Adder, and Substractor >>>>> inferface, too. >>>>> >>>>> And we should double check if there are other interface we might miss >>> atm. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> On 5/4/17 1:31 PM, Matthias J. Sax wrote: >>>>>> Thanks for updating the KIP. >>>>>> >>>>>> Deep copying the key will work for sure, but I am actually a little >> bit >>>>>> worried about performance impact... We might want to do some test to >>>>>> quantify this impact. >>>>>> >>>>>> >>>>>> Btw: this remind me about the idea of `RichFunction` interface that >>>>>> would allow users to access record metadata (like timestamp, offset, >>>>>> partition etc) within DSL. This would be a similar concept. Thus, I >> am >>>>>> wondering, if it would make sense to enlarge the scope of this KIP by >>>>>> that? WDYT? >>>>>> >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> On 5/3/17 2:08 AM, Jeyhun Karimov wrote: >>>>>>> Hi Mathieu, >>>>>>> >>>>>>> Thanks for feedback. I followed similar approach and updated PR and >>> KIP >>>>>>> accordingly. I tried to guard the key in Processors sending a copy >> of >>> an >>>>>>> actual key. >>>>>>> Because I am doing deep copy of an object, I think memory can be >>>>> bottleneck >>>>>>> in some use-cases. >>>>>>> >>>>>>> Cheers, >>>>>>> Jeyhun >>>>>>> >>>>>>> On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak < >>>>> mathieu.fenn...@replicon.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Jeyhun, >>>>>>>> >>>>>>>> This approach would change ValueMapper (...etc) to be classes, >> rather >>>>> than >>>>>>>> interfaces, which is also a backwards incompatible change. An >>>>> alternative >>>>>>>> approach that would be backwards compatible would be to define new >>>>>>>> interfaces, and provide overrides where those interfaces are used. >>>>>>>> >>>>>>>> Unfortunately, making the key parameter as "final" doesn't change >>> much >>>>>>>> about guarding against key change. It only prevents the parameter >>>>> variable >>>>>>>> from being reassigned. If the key type is a mutable object (eg. >>>>> byte[]), >>>>>>>> it can still be mutated. (eg. key[0] = 0). But I'm not really sure >>>>> there's >>>>>>>> much that can be done about that. >>>>>>>> >>>>>>>> Mathieu >>>>>>>> >>>>>>>> >>>>>>>> On Mon, May 1, 2017 at 5:39 PM, Jeyhun Karimov < >> je.kari...@gmail.com >>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks for comments. >>>>>>>>> >>>>>>>>> The concerns makes sense. Although we can guard for immutable keys >>> in >>>>>>>>> current implementation (with few changes), I didn't consider >>> backward >>>>>>>>> compatibility. >>>>>>>>> >>>>>>>>> In this case 2 solutions come to my mind. In both cases, user >>> accesses >>>>>>>> the >>>>>>>>> key in Object type, as passing extra type parameter will break >>>>>>>>> backwards-compatibility. So user has to cast to actual key type. >>>>>>>>> >>>>>>>>> 1. Firstly, We can overload apply method with 2 argument (key and >>>>> value) >>>>>>>>> and force key to be *final*. By doing this, I think we can >> address >>>>> both >>>>>>>>> backward-compatibility and guarding against key change. >>>>>>>>> >>>>>>>>> 2. Secondly, we can create class KeyAccess like: >>>>>>>>> >>>>>>>>> public class KeyAccess { >>>>>>>>> Object key; >>>>>>>>> public void beforeApply(final Object key) { >>>>>>>>> this.key = key; >>>>>>>>> } >>>>>>>>> public Object getKey() { >>>>>>>>> return key; >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> We can extend *ValueMapper, ValueJoiner* and *ValueTransformer* >> from >>>>>>>>> *KeyAccess*. Inside processor (for example >>> *KTableMapValuesProcessor*) >>>>>>>>> before calling *mapper.apply(value)* we can set the *key* by >>>>>>>>> *mapper.beforeApply(key)*. As a result, user can use *getKey()* to >>>>> access >>>>>>>>> the key inside *apply(value)* method. >>>>>>>>> >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Jeyhun >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, May 1, 2017 at 7:24 PM Matthias J. Sax < >>> matth...@confluent.io >>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Jeyhun, >>>>>>>>>> >>>>>>>>>> thanks a lot for the KIP! >>>>>>>>>> >>>>>>>>>> I think there are two issues we need to address: >>>>>>>>>> >>>>>>>>>> (1) The KIP does not consider backward compatibility. Users did >>>>>>>> complain >>>>>>>>>> about this in past releases already, and as the user base grows, >> we >>>>>>>>>> should not break backward compatibility in future releases >> anymore. >>>>>>>>>> Thus, we should think of a better way to allow key access. >>>>>>>>>> >>>>>>>>>> Mathieu's comment goes into the same direction >>>>>>>>>> >>>>>>>>>>>> On the other hand, the number of compile failures that would >> need >>>>> to >>>>>>>>> be >>>>>>>>>>>> fixed from this change is unfortunate. :-) >>>>>>>>>> >>>>>>>>>> (2) Another concern is, that there is no guard to prevent user >> code >>>>> to >>>>>>>>>> modify the key. This might corrupt partitioning if users do alter >>> the >>>>>>>>>> key (accidentally -- or users are just not aware that they are >> not >>>>>>>>>> allowed to modify the provided key object) and thus break the >>>>>>>>>> application. (This was the original motivation to not provide the >>> key >>>>>>>> in >>>>>>>>>> the first place -- it's guards against modification.) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 5/1/17 6:31 AM, Mathieu Fenniak wrote: >>>>>>>>>>> Hi Jeyhun, >>>>>>>>>>> >>>>>>>>>>> I just want to add my voice that, I too, have wished for access >> to >>>>>>>> the >>>>>>>>>>> record key during a mapValues or similar operation. >>>>>>>>>>> >>>>>>>>>>> On the other hand, the number of compile failures that would >> need >>> to >>>>>>>> be >>>>>>>>>>> fixed from this change is unfortunate. :-) But at least it >> would >>>>> all >>>>>>>>> be >>>>>>>>>> a >>>>>>>>>>> pretty clear and easy change. >>>>>>>>>>> >>>>>>>>>>> Mathieu >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, May 1, 2017 at 6:55 AM, Jeyhun Karimov < >>>>> je.kari...@gmail.com >>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Dear community, >>>>>>>>>>>> >>>>>>>>>>>> I want to share KIP-149 [1] based on issues KAFKA-4218 [2], >>>>>>>> KAFKA-4726 >>>>>>>>>> [3], >>>>>>>>>>>> KAFKA-3745 [4]. The related PR can be found at [5]. >>>>>>>>>>>> I would like to get your comments. >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>> 149%3A+Enabling+key+access+in+ValueTransformer%2C+ >>>>>>>>>>>> ValueMapper%2C+and+ValueJoiner >>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-4218 >>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/KAFKA-4726 >>>>>>>>>>>> [4] https://issues.apache.org/jira/browse/KAFKA-3745 >>>>>>>>>>>> [5] https://github.com/apache/kafka/pull/2946 >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Jeyhun >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> -Cheers >>>>>>>>>>>> >>>>>>>>>>>> Jeyhun >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>> -Cheers >>>>>>>>> >>>>>>>>> Jeyhun >>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>>> -- >>>> -Cheers >>>> >>>> Jeyhun >>>> >>> >>> >>
signature.asc
Description: OpenPGP digital signature