If `ValueMapperWithKey` extends `ValueMapper` we don't need the new
overlaod.

And yes, we need to do one check -- but this happens when building the
topology. At runtime (I mean after KafkaStream#start() we don't need any
check as we will always use `ValueMapperWithKey`)


-Matthias


On 5/9/17 2:55 AM, Jeyhun Karimov wrote:
> Hi,
> 
> Thanks for feedback.
> Then we need to overload method
>   <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR>
> mapper);
> with
>   <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super V, ? extends VR>
> mapper);
> 
> and in runtime (inside processor) we still have to check it is ValueMapper
> or ValueMapperWithKey before wrapping it into the rich function.
> 
> 
> Please correct me if I am wrong.
> 
> Cheers,
> Jeyhun
> 
> 
> 
> 
> On Tue, May 9, 2017 at 10:56 AM Michal Borowiecki <
> michal.borowie...@openbet.com> wrote:
> 
>> +1 :)
>>
>>
>> On 08/05/17 23:52, Matthias J. Sax wrote:
>>> Hi,
>>>
>>> I was reading the updated KIP and I am wondering, if we should do the
>>> design a little different.
>>>
>>> Instead of distinguishing between a RichFunction and non-RichFunction at
>>> runtime level, we would use RichFunctions all the time. Thus, on the DSL
>>> entry level, if a user provides a non-RichFunction, we wrap it by a
>>> RichFunction that is fully implemented by Streams. This RichFunction
>>> would just forward the call omitting the key:
>>>
>>> Just to sketch the idea (incomplete code snippet):
>>>
>>>> public StreamsRichValueMapper implements RichValueMapper() {
>>>>    private ValueMapper userProvidedMapper; // set by constructor
>>>>
>>>>    public VR apply(final K key, final V1 value1, final V2 value2) {
>>>>        return userProvidedMapper(value1, value2);
>>>>    }
>>>> }
>>>
>>>  From a performance point of view, I am not sure if the
>>> "if(isRichFunction)" including casts etc would have more overhead than
>>> this approach (we would do more nested method call for non-RichFunction
>>> which should be more common than RichFunctions).
>>>
>>> This approach should not effect lambdas (or do I miss something?) and
>>> might be cleaner, as we could have one more top level interface
>>> `RichFunction` with methods `init()` and `close()` and also interfaces
>>> for `RichValueMapper` etc. (thus, no abstract classes required).
>>>
>>>
>>> Any thoughts?
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 5/6/17 5:29 PM, Jeyhun Karimov wrote:
>>>> Hi,
>>>>
>>>> Thanks for comments. I extended PR and KIP to include rich functions. I
>>>> will still have to evaluate the cost of deep copying of keys.
>>>>
>>>> Cheers,
>>>> Jeyhun
>>>>
>>>> On Fri, May 5, 2017 at 8:02 PM Mathieu Fenniak <
>> mathieu.fenn...@replicon.com>
>>>> wrote:
>>>>
>>>>> Hey Matthias,
>>>>>
>>>>> My opinion would be that documenting the immutability of the key is the
>>>>> best approach available.  Better than requiring the key to be
>> serializable
>>>>> (as with Jeyhun's last pass at the PR), no performance risk.
>>>>>
>>>>> It'd be different if Java had immutable type constraints of some kind.
>> :-)
>>>>>
>>>>> Mathieu
>>>>>
>>>>>
>>>>> On Fri, May 5, 2017 at 11:31 AM, Matthias J. Sax <
>> matth...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Agreed about RichFunction. If we follow this path, it should cover
>>>>>> all(?) DSL interfaces.
>>>>>>
>>>>>> About guarding the key -- I am still not sure what to do about it...
>>>>>> Maybe it might be enough to document it (and name the key parameter
>> like
>>>>>> `readOnlyKey` to make is very clear). Ultimately, I would prefer to
>>>>>> guard against any modification, but I have no good idea how to do
>> this.
>>>>>> Not sure what others think about the risk of corrupted partitioning
>>>>>> (what would be a user error and we could say, well, bad luck, you got
>> a
>>>>>> bug in your code, that's not our fault), vs deep copy with a potential
>>>>>> performance hit (that we can't quantity atm without any performance
>>>>> test).
>>>>>> We do have a performance system test. Maybe it's worth for you to
>> apply
>>>>>> the deep copy strategy and run the test. It's very basic performance
>>>>>> test only, but might give some insight. If you want to do this, look
>>>>>> into folder "tests" for general test setup, and into
>>>>>> "tests/kafaktests/benchmarks/streams" to find find the perf test.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 5/5/17 8:55 AM, Jeyhun Karimov wrote:
>>>>>>> Hi Matthias,
>>>>>>>
>>>>>>> I think extending KIP to include RichFunctions totally  makes sense.
>>>>> So,
>>>>>>>   we don't want to guard the keys because it is costly.
>>>>>>> If we introduce RichFunctions I think it should not be limited only
>>>>> the 3
>>>>>>> interfaces proposed in KIP but to wide range of interfaces.
>>>>>>> Please correct me if I am wrong.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Jeyhun
>>>>>>>
>>>>>>> On Fri, May 5, 2017 at 12:04 AM Matthias J. Sax <
>> matth...@confluent.io
>>>>>>> wrote:
>>>>>>>
>>>>>>>> One follow up. There was this email on the user list:
>>>>>>>>
>>>>>>>>
>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND17KhCaBzPSZ1?subj=
>>>>>> Shouldn+t+the+initializer+of+a+stream+aggregate+accept+the+key+
>>>>>>>> It might make sense so include Initializer, Adder, and Substractor
>>>>>>>> inferface, too.
>>>>>>>>
>>>>>>>> And we should double check if there are other interface we might
>> miss
>>>>>> atm.
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>> On 5/4/17 1:31 PM, Matthias J. Sax wrote:
>>>>>>>>> Thanks for updating the KIP.
>>>>>>>>>
>>>>>>>>> Deep copying the key will work for sure, but I am actually a little
>>>>> bit
>>>>>>>>> worried about performance impact... We might want to do some test
>> to
>>>>>>>>> quantify this impact.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Btw: this remind me about the idea of `RichFunction` interface that
>>>>>>>>> would allow users to access record metadata (like timestamp,
>> offset,
>>>>>>>>> partition etc) within DSL. This would be a similar concept. Thus, I
>>>>> am
>>>>>>>>> wondering, if it would make sense to enlarge the scope of this KIP
>> by
>>>>>>>>> that? WDYT?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 5/3/17 2:08 AM, Jeyhun Karimov wrote:
>>>>>>>>>> Hi Mathieu,
>>>>>>>>>>
>>>>>>>>>> Thanks for feedback. I followed similar approach and updated PR
>> and
>>>>>> KIP
>>>>>>>>>> accordingly. I tried to guard the key in Processors sending a copy
>>>>> of
>>>>>> an
>>>>>>>>>> actual key.
>>>>>>>>>> Because I am doing deep copy of an object, I think memory can be
>>>>>>>> bottleneck
>>>>>>>>>> in some use-cases.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Jeyhun
>>>>>>>>>>
>>>>>>>>>> On Tue, May 2, 2017 at 5:10 PM Mathieu Fenniak <
>>>>>>>> mathieu.fenn...@replicon.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Jeyhun,
>>>>>>>>>>>
>>>>>>>>>>> This approach would change ValueMapper (...etc) to be classes,
>>>>> rather
>>>>>>>> than
>>>>>>>>>>> interfaces, which is also a backwards incompatible change.  An
>>>>>>>> alternative
>>>>>>>>>>> approach that would be backwards compatible would be to define
>> new
>>>>>>>>>>> interfaces, and provide overrides where those interfaces are
>> used.
>>>>>>>>>>>
>>>>>>>>>>> Unfortunately, making the key parameter as "final" doesn't change
>>>>>> much
>>>>>>>>>>> about guarding against key change.  It only prevents the
>> parameter
>>>>>>>> variable
>>>>>>>>>>> from being reassigned.  If the key type is a mutable object (eg.
>>>>>>>> byte[]),
>>>>>>>>>>> it can still be mutated. (eg. key[0] = 0).  But I'm not really
>> sure
>>>>>>>> there's
>>>>>>>>>>> much that can be done about that.
>>>>>>>>>>>
>>>>>>>>>>> Mathieu
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, May 1, 2017 at 5:39 PM, Jeyhun Karimov <
>>>>> je.kari...@gmail.com
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks for comments.
>>>>>>>>>>>>
>>>>>>>>>>>> The concerns makes sense. Although we can guard for immutable
>> keys
>>>>>> in
>>>>>>>>>>>> current implementation (with few changes), I didn't consider
>>>>>> backward
>>>>>>>>>>>> compatibility.
>>>>>>>>>>>>
>>>>>>>>>>>> In this case 2 solutions come to my mind. In both cases, user
>>>>>> accesses
>>>>>>>>>>> the
>>>>>>>>>>>> key in Object type, as passing extra type parameter will break
>>>>>>>>>>>> backwards-compatibility.  So user has to cast to actual key
>> type.
>>>>>>>>>>>>
>>>>>>>>>>>> 1. Firstly, We can overload apply method with 2 argument (key
>> and
>>>>>>>> value)
>>>>>>>>>>>> and force key to be *final*. By doing this,  I think we can
>>>>> address
>>>>>>>> both
>>>>>>>>>>>> backward-compatibility and guarding against key change.
>>>>>>>>>>>>
>>>>>>>>>>>> 2. Secondly, we can create class KeyAccess like:
>>>>>>>>>>>>
>>>>>>>>>>>> public class KeyAccess {
>>>>>>>>>>>>      Object key;
>>>>>>>>>>>>      public void beforeApply(final Object key) {
>>>>>>>>>>>>          this.key = key;
>>>>>>>>>>>>      }
>>>>>>>>>>>>      public Object getKey() {
>>>>>>>>>>>>          return key;
>>>>>>>>>>>>      }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> We can extend *ValueMapper, ValueJoiner* and *ValueTransformer*
>>>>> from
>>>>>>>>>>>> *KeyAccess*. Inside processor (for example
>>>>>> *KTableMapValuesProcessor*)
>>>>>>>>>>>> before calling *mapper.apply(value)* we can set the *key* by
>>>>>>>>>>>> *mapper.beforeApply(key)*. As a result, user can use *getKey()*
>> to
>>>>>>>> access
>>>>>>>>>>>> the key inside *apply(value)* method.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, May 1, 2017 at 7:24 PM Matthias J. Sax <
>>>>>> matth...@confluent.io
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Jeyhun,
>>>>>>>>>>>>>
>>>>>>>>>>>>> thanks a lot for the KIP!
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think there are two issues we need to address:
>>>>>>>>>>>>>
>>>>>>>>>>>>> (1) The KIP does not consider backward compatibility. Users did
>>>>>>>>>>> complain
>>>>>>>>>>>>> about this in past releases already, and as the user base
>> grows,
>>>>> we
>>>>>>>>>>>>> should not break backward compatibility in future releases
>>>>> anymore.
>>>>>>>>>>>>> Thus, we should think of a better way to allow key access.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Mathieu's comment goes into the same direction
>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On the other hand, the number of compile failures that would
>>>>> need
>>>>>>>> to
>>>>>>>>>>>> be
>>>>>>>>>>>>>>> fixed from this change is unfortunate. :-)
>>>>>>>>>>>>> (2) Another concern is, that there is no guard to prevent user
>>>>> code
>>>>>>>> to
>>>>>>>>>>>>> modify the key. This might corrupt partitioning if users do
>> alter
>>>>>> the
>>>>>>>>>>>>> key (accidentally -- or users are just not aware that they are
>>>>> not
>>>>>>>>>>>>> allowed to modify the provided key object) and thus break the
>>>>>>>>>>>>> application. (This was the original motivation to not provide
>> the
>>>>>> key
>>>>>>>>>>> in
>>>>>>>>>>>>> the first place -- it's guards against modification.)
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 5/1/17 6:31 AM, Mathieu Fenniak wrote:
>>>>>>>>>>>>>> Hi Jeyhun,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I just want to add my voice that, I too, have wished for
>> access
>>>>> to
>>>>>>>>>>> the
>>>>>>>>>>>>>> record key during a mapValues or similar operation.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On the other hand, the number of compile failures that would
>>>>> need
>>>>>> to
>>>>>>>>>>> be
>>>>>>>>>>>>>> fixed from this change is unfortunate. :-)  But at least it
>>>>> would
>>>>>>>> all
>>>>>>>>>>>> be
>>>>>>>>>>>>> a
>>>>>>>>>>>>>> pretty clear and easy change.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Mathieu
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, May 1, 2017 at 6:55 AM, Jeyhun Karimov <
>>>>>>>> je.kari...@gmail.com
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> Dear community,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I want to share KIP-149 [1] based on issues KAFKA-4218 [2],
>>>>>>>>>>> KAFKA-4726
>>>>>>>>>>>>> [3],
>>>>>>>>>>>>>>> KAFKA-3745 [4]. The related PR can be found at [5].
>>>>>>>>>>>>>>> I would like to get your comments.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>> 149%3A+Enabling+key+access+in+ValueTransformer%2C+
>>>>>>>>>>>>>>> ValueMapper%2C+and+ValueJoiner
>>>>>>>>>>>>>>> [2] https://issues.apache.org/jira/browse/KAFKA-4218
>>>>>>>>>>>>>>> [3] https://issues.apache.org/jira/browse/KAFKA-4726
>>>>>>>>>>>>>>> [4] https://issues.apache.org/jira/browse/KAFKA-3745
>>>>>>>>>>>>>>> [5] https://github.com/apache/kafka/pull/2946
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> -Cheers
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>> -Cheers
>>>>>>>>>>>>
>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>
>>>>>>>> --
>>>>>>> -Cheers
>>>>>>>
>>>>>>> Jeyhun
>>>>>>>
>>>>>>
>>
>> --
> -Cheers
> 
> Jeyhun
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to