Hi,

I was reading the updated KIP and I am wondering, if we should do the
design a little different.

Instead of distinguishing between a RichFunction and non-RichFunction at
runtime level, we would use RichFunctions all the time. Thus, on the DSL
entry level, if a user provides a non-RichFunction, we wrap it by a
RichFunction that is fully implemented by Streams. This RichFunction
would just forward the call omitting the key:

Just to sketch the idea (incomplete code snippet):

> public StreamsRichValueMapper implements RichValueMapper() {
>   private ValueMapper userProvidedMapper; // set by constructor
> 
>   public VR apply(final K key, final V1 value1, final V2 value2) {
>       return userProvidedMapper(value1, value2);
>   }
> }


From a performance point of view, I am not sure if the
"if(isRichFunction)" including casts etc would have more overhead than
this approach (we would do more nested method call for non-RichFunction
which should be more common than RichFunctions).

This approach should not effect lambdas (or do I miss something?) and
might be cleaner, as we could have one more top level interface
`RichFunction` with methods `init()` and `close()` and also interfaces
for `RichValueMapper` etc. (thus, no abstract classes required).


Any thoughts?


-Matthias


On 5/6/17 5:29 PM, Jeyhun Karimov wrote:
> Hi,
> 
> Thanks for comments. I extended PR and KIP to include rich functions. I
> will still have to evaluate the cost of deep copying of keys.
> 
> Cheers,
> Jeyhun
> 
> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak <mathieu.fenn...@replicon.com>
> wrote:
> 
>> Hey Matthias,
>>
>> My opinion would be that documenting the immutability of the key is the
>> best approach available.  Better than requiring the key to be serializable
>> (as with Jeyhun's last pass at the PR), no performance risk.
>>
>> It'd be different if Java had immutable type constraints of some kind. :-)
>>
>> Mathieu
>>
>>
>> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>>> Agreed about RichFunction. If we follow this path, it should cover
>>> all(?) DSL interfaces.
>>>
>>> About guarding the key -- I am still not sure what to do about it...
>>> Maybe it might be enough to document it (and name the key parameter like
>>> `readOnlyKey` to make is very clear). Ultimately, I would prefer to
>>> guard against any modification, but I have no good idea how to do this.
>>> Not sure what others think about the risk of corrupted partitioning
>>> (what would be a user error and we could say, well, bad luck, you got a
>>> bug in your code, that's not our fault), vs deep copy with a potential
>>> performance hit (that we can't quantity atm without any performance
>> test).
>>>
>>> We do have a performance system test. Maybe it's worth for you to apply
>>> the deep copy strategy and run the test. It's very basic performance
>>> test only, but might give some insight. If you want to do this, look
>>> into folder "tests" for general test setup, and into
>>> "tests/kafaktests/benchmarks/streams" to find find the perf test.
>>>
>>>
>>> -Matthias
>>>
>>> On 5/5/17 8:55 AM, Jeyhun Karimov wrote:
>>>> Hi Matthias,
>>>>
>>>> I think extending KIP to include RichFunctions totally  makes sense.
>> So,
>>>>  we don't want to guard the keys because it is costly.
>>>> If we introduce RichFunctions I think it should not be limited only
>> the 3
>>>> interfaces proposed in KIP but to wide range of interfaces.
>>>> Please correct me if I am wrong.
>>>>
>>>> Cheers,
>>>> Jeyhun
>>>>
>>>> On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax <matth...@confluent.io
>>>
>>>> wrote:
>>>>
>>>>> One follow up. There was this email on the user list:
>>>>>
>>>>>
>>>>> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj=
>>> Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+
>>>>>
>>>>> It might make sense so include Initializer, Adder, and Substractor
>>>>> inferface, too.
>>>>>
>>>>> And we should double check if there are other interface we might miss
>>> atm.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 5/4/17 1:31 PM, Matthias J. Sax wrote:
>>>>>> Thanks for updating the KIP.
>>>>>>
>>>>>> Deep copying the key will work for sure, but I am actually a little
>> bit
>>>>>> worried about performance impact... We might want to do some test to
>>>>>> quantify this impact.
>>>>>>
>>>>>>
>>>>>> Btw: this remind me about the idea of `RichFunction` interface that
>>>>>> would allow users to access record metadata (like timestamp, offset,
>>>>>> partition etc) within DSL. This would be a similar concept. Thus, I
>> am
>>>>>> wondering, if it would make sense to enlarge the scope of this KIP by
>>>>>> that? WDYT?
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 5/3/17 2:08 AM, Jeyhun Karimov wrote:
>>>>>>> Hi Mathieu,
>>>>>>>
>>>>>>> Thanks for feedback. I followed similar approach and updated PR and
>>> KIP
>>>>>>> accordingly. I tried to guard the key in Processors sending a copy
>> of
>>> an
>>>>>>> actual key.
>>>>>>> Because I am doing deep copy of an object, I think memory can be
>>>>> bottleneck
>>>>>>> in some use-cases.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Jeyhun
>>>>>>>
>>>>>>> On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak <
>>>>> mathieu.fenn...@replicon.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Jeyhun,
>>>>>>>>
>>>>>>>> This approach would change ValueMapper (...etc) to be classes,
>> rather
>>>>> than
>>>>>>>> interfaces, which is also a backwards incompatible change.  An
>>>>> alternative
>>>>>>>> approach that would be backwards compatible would be to define new
>>>>>>>> interfaces, and provide overrides where those interfaces are used.
>>>>>>>>
>>>>>>>> Unfortunately, making the key parameter as "final" doesn't change
>>> much
>>>>>>>> about guarding against key change.  It only prevents the parameter
>>>>> variable
>>>>>>>> from being reassigned.  If the key type is a mutable object (eg.
>>>>> byte[]),
>>>>>>>> it can still be mutated. (eg. key[0] = 0).  But I'm not really sure
>>>>> there's
>>>>>>>> much that can be done about that.
>>>>>>>>
>>>>>>>> Mathieu
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, May 1, 2017 at 5:39 PM, Jeyhun Karimov <
>> je.kari...@gmail.com
>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks for comments.
>>>>>>>>>
>>>>>>>>> The concerns makes sense. Although we can guard for immutable keys
>>> in
>>>>>>>>> current implementation (with few changes), I didn't consider
>>> backward
>>>>>>>>> compatibility.
>>>>>>>>>
>>>>>>>>> In this case 2 solutions come to my mind. In both cases, user
>>> accesses
>>>>>>>> the
>>>>>>>>> key in Object type, as passing extra type parameter will break
>>>>>>>>> backwards-compatibility.  So user has to cast to actual key type.
>>>>>>>>>
>>>>>>>>> 1. Firstly, We can overload apply method with 2 argument (key and
>>>>> value)
>>>>>>>>> and force key to be *final*. By doing this,  I think we can
>> address
>>>>> both
>>>>>>>>> backward-compatibility and guarding against key change.
>>>>>>>>>
>>>>>>>>> 2. Secondly, we can create class KeyAccess like:
>>>>>>>>>
>>>>>>>>> public class KeyAccess {
>>>>>>>>>     Object key;
>>>>>>>>>     public void beforeApply(final Object key) {
>>>>>>>>>         this.key = key;
>>>>>>>>>     }
>>>>>>>>>     public Object getKey() {
>>>>>>>>>         return key;
>>>>>>>>>     }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> We can extend *ValueMapper, ValueJoiner* and *ValueTransformer*
>> from
>>>>>>>>> *KeyAccess*. Inside processor (for example
>>> *KTableMapValuesProcessor*)
>>>>>>>>> before calling *mapper.apply(value)* we can set the *key* by
>>>>>>>>> *mapper.beforeApply(key)*. As a result, user can use *getKey()* to
>>>>> access
>>>>>>>>> the key inside *apply(value)* method.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Jeyhun
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, May 1, 2017 at 7:24 PM Matthias J. Sax <
>>> matth...@confluent.io
>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Jeyhun,
>>>>>>>>>>
>>>>>>>>>> thanks a lot for the KIP!
>>>>>>>>>>
>>>>>>>>>> I think there are two issues we need to address:
>>>>>>>>>>
>>>>>>>>>> (1) The KIP does not consider backward compatibility. Users did
>>>>>>>> complain
>>>>>>>>>> about this in past releases already, and as the user base grows,
>> we
>>>>>>>>>> should not break backward compatibility in future releases
>> anymore.
>>>>>>>>>> Thus, we should think of a better way to allow key access.
>>>>>>>>>>
>>>>>>>>>> Mathieu's comment goes into the same direction
>>>>>>>>>>
>>>>>>>>>>>> On the other hand, the number of compile failures that would
>> need
>>>>> to
>>>>>>>>> be
>>>>>>>>>>>> fixed from this change is unfortunate. :-)
>>>>>>>>>>
>>>>>>>>>> (2) Another concern is, that there is no guard to prevent user
>> code
>>>>> to
>>>>>>>>>> modify the key. This might corrupt partitioning if users do alter
>>> the
>>>>>>>>>> key (accidentally -- or users are just not aware that they are
>> not
>>>>>>>>>> allowed to modify the provided key object) and thus break the
>>>>>>>>>> application. (This was the original motivation to not provide the
>>> key
>>>>>>>> in
>>>>>>>>>> the first place -- it's guards against modification.)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 5/1/17 6:31 AM, Mathieu Fenniak wrote:
>>>>>>>>>>> Hi Jeyhun,
>>>>>>>>>>>
>>>>>>>>>>> I just want to add my voice that, I too, have wished for access
>> to
>>>>>>>> the
>>>>>>>>>>> record key during a mapValues or similar operation.
>>>>>>>>>>>
>>>>>>>>>>> On the other hand, the number of compile failures that would
>> need
>>> to
>>>>>>>> be
>>>>>>>>>>> fixed from this change is unfortunate. :-)  But at least it
>> would
>>>>> all
>>>>>>>>> be
>>>>>>>>>> a
>>>>>>>>>>> pretty clear and easy change.
>>>>>>>>>>>
>>>>>>>>>>> Mathieu
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, May 1, 2017 at 6:55 AM, Jeyhun Karimov <
>>>>> je.kari...@gmail.com
>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Dear community,
>>>>>>>>>>>>
>>>>>>>>>>>> I want to share KIP-149 [1] based on issues KAFKA-4218 [2],
>>>>>>>> KAFKA-4726
>>>>>>>>>> [3],
>>>>>>>>>>>> KAFKA-3745 [4]. The related PR can be found at [5].
>>>>>>>>>>>> I would like to get your comments.
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>> 149%3A+Enabling+key+access+in+ValueTransformer%2C+
>>>>>>>>>>>> ValueMapper%2C+and+ValueJoiner
>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-4218
>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/KAFKA-4726
>>>>>>>>>>>> [4] https://issues.apache.org/jira/browse/KAFKA-3745
>>>>>>>>>>>> [5] https://github.com/apache/kafka/pull/2946
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> -Cheers
>>>>>>>>>>>>
>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>> -Cheers
>>>>>>>>>
>>>>>>>>> Jeyhun
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>> -Cheers
>>>>
>>>> Jeyhun
>>>>
>>>
>>>
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to