Thanks for updating the KIP.

I have one concern with regard to backward compatibility. You suggest to
use RecrodContext as base interface for ProcessorContext. This will
break compatibility.

I think, we should just have two independent interfaces. Our own
ProcessorContextImpl class would implement both. This allows us to cast
it to `RecordContext` and thus limit the visible scope.


-Matthias



On 6/27/17 1:35 PM, Jeyhun Karimov wrote:
> Hi all,
> 
> I updated the KIP w.r.t. discussion and comments.
> Basically I eliminated overloads for particular method if they are more
> than 3.
> As we can see there are a lot of overloads (and more will come with KIP-149
> :) )
> So, is it wise to
> wait the result of constructive DSL thread or
> extend KIP to address this issue as well or
> continue as it is?
> 
> Cheers,
> Jeyhun
> 
> On Wed, Jun 14, 2017 at 11:29 PM Guozhang Wang <wangg...@gmail.com> wrote:
> 
>> LGTM. Thanks!
>>
>>
>> Guozhang
>>
>> On Tue, Jun 13, 2017 at 2:20 PM, Jeyhun Karimov <je.kari...@gmail.com>
>> wrote:
>>
>>> Thanks for the comment Matthias. After all the discussion (thanks to all
>>> participants), I think this (single method that passes in a RecordContext
>>> object) is the best alternative.
>>> Just a side note: I think KAFKA-3907 [1] can also be integrated into the
>>> KIP by adding related method inside RecordContext interface.
>>>
>>>
>>> [1] https://issues.apache.org/jira/browse/KAFKA-3907
>>>
>>>
>>> Cheers,
>>> Jeyhun
>>>
>>> On Tue, Jun 13, 2017 at 7:50 PM Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I would like to push this discussion further. It seems we got nice
>>>> alternatives (thanks for the summary Jeyhun!).
>>>>
>>>> With respect to RichFunctions and allowing them to be stateful, I have
>>>> my doubt as expressed already. From my understanding, the idea was to
>>>> give access to record metadata information only. If you want to do a
>>>> stateful computation you should rather use #transform().
>>>>
>>>> Furthermore, as pointed out, we would need to switch to a
>>>> supplier-pattern introducing many more overloads.
>>>>
>>>> For those reason, I advocate for a simple interface with a single
>> method
>>>> that passes in a RecordContext object.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 6/6/17 5:15 PM, Guozhang Wang wrote:
>>>>> Thanks for the comprehensive summary!
>>>>>
>>>>> Personally I'd prefer the option of passing RecordContext as an
>>>> additional
>>>>> parameter into he overloaded function. But I'm also open to other
>>>> arguments
>>>>> if there are sth. that I have overlooked.
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Mon, Jun 5, 2017 at 3:19 PM, Jeyhun Karimov <je.kari...@gmail.com
>>>
>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Thanks for your comments Matthias and Guozhang.
>>>>>>
>>>>>> Below I mention the quick summary of the main alternatives we looked
>>> at
>>>> to
>>>>>> introduce the Rich functions (I will refer to it as Rich functions
>>>> until we
>>>>>> find better/another name). Initially the proposed alternatives was
>> not
>>>>>> backwards-compatible, so I will not mention them.
>>>>>> The related discussions are spread in KIP-149 and in this KIP
>>> (KIP-159)
>>>>>> discussion threads.
>>>>>>
>>>>>>
>>>>>>
>>>>>> 1. The idea of rich functions came into the stage with KIP-149, in
>>>>>> discussion thread. As a result we extended KIP-149 to support Rich
>>>>>> functions as well.
>>>>>>
>>>>>> 2.  To as part of the Rich functions, we provided init
>>>> (ProcessorContext)
>>>>>> method. Afterwards, Dammian suggested that we should not provide
>>>>>> ProcessorContext to users. As a result, we separated the two
>> problems
>>>> into
>>>>>> two separate KIPs, as it seems they can be solved in parallel.
>>>>>>
>>>>>> - One approach we considered was :
>>>>>>
>>>>>> public interface ValueMapperWithKey<K, V, VR> {
>>>>>>     VR apply(final K key, final V value);
>>>>>> }
>>>>>>
>>>>>> public interface RichValueMapper<K, V, VR> extends RichFunction{
>>>>>> }
>>>>>>
>>>>>> public interface RichFunction {
>>>>>>     void init(RecordContext recordContext);
>>>>>>     void close();
>>>>>> }
>>>>>>
>>>>>> public interface RecordContext {
>>>>>>     String applicationId();
>>>>>>     TaskId taskId();
>>>>>>     StreamsMetrics metrics();
>>>>>>     String topic();
>>>>>>     int partition();
>>>>>>     long offset();
>>>>>>     long timestamp();
>>>>>>     Map<String, Object> appConfigs();
>>>>>>     Map<String, Object> appConfigsWithPrefix(String prefix);
>>>>>> }
>>>>>>
>>>>>>
>>>>>> public interface ProcessorContext extends RecordContext {
>>>>>>    // all methods but the ones in RecordContext
>>>>>> }
>>>>>>
>>>>>> As a result:
>>>>>> * . All "withKey" and "withoutKey" interfaces can be converted to
>>> their
>>>>>> Rich counterparts (with empty init() and close() methods)
>>>>>> *. All related Processors will accept Rich interfaces in their
>>>>>> constructors.
>>>>>> *. So, we convert the related "withKey" or "withoutKey" interfaces
>> to
>>>> Rich
>>>>>> interface while building the topology and initialize the related
>>>> processors
>>>>>> with Rich interfaces only.
>>>>>> *. We will not need to overloaded methods for rich functions as Rich
>>>>>> interfaces extend withKey interfaces. We will just check the object
>>> type
>>>>>> and act accordingly.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> 3. There was some thoughts that the above approach does not support
>>>> lambdas
>>>>>> so we should support only one method, only init(RecordContext), as
>>> part
>>>> of
>>>>>> Rich interfaces.
>>>>>> This is still in discussion. Personally I think Rich interfaces are
>> by
>>>>>> definition lambda-free and we should not care much about it.
>>>>>>
>>>>>>
>>>>>> 4. Thanks to Matthias's discussion, an alternative we considered was
>>> to
>>>>>> pass in the RecordContext as method parameter.  This might even
>> allow
>>> to
>>>>>> use Lambdas and we could keep the name RichFunction as we preserve
>> the
>>>>>> nature of being a function.
>>>>>> "If you go with `init()` and `close()` we basically
>>>>>> allow users to have an in-memory state for a function. Thus, we
>> cannot
>>>>>> share a single instance of RichValueMapper (etc) over multiple tasks
>>> and
>>>>>> we would need a supplier pattern similar to #transform(). And this
>>> would
>>>>>> "break the flow" of the API, as (Rich)ValueMapperSupplier would not
>>>>>> inherit from ValueMapper and thus we would need many new overload
>> for
>>>>>> KStream/KTable classes". (Copy paste from Matthias's email)
>>>>>>
>>>>>>
>>>>>> Cheers,
>>>>>> Jeyhun
>>>>>>
>>>>>>
>>>>>> On Mon, Jun 5, 2017 at 5:18 AM Matthias J. Sax <
>> matth...@confluent.io
>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes, we did consider this, and there is no consensus yet what the
>>> best
>>>>>>> alternative is.
>>>>>>>
>>>>>>> @Jeyhun: the email thread got pretty long. Maybe you can give a
>> quick
>>>>>>> summary of the current state of the discussion?
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>> On 6/4/17 6:04 PM, Guozhang Wang wrote:
>>>>>>>> Thanks for the explanation Jeyhun and Matthias.
>>>>>>>>
>>>>>>>> I have just read through both KIP-149 and KIP-159 and am wondering
>>> if
>>>>>> you
>>>>>>>> guys have considered a slight different approach for rich
>> function,
>>>>>> that
>>>>>>> is
>>>>>>>> to add the `RecordContext` into the apply functions as an
>> additional
>>>>>>>> parameter. For example:
>>>>>>>>
>>>>>>>> ---------------------------
>>>>>>>>
>>>>>>>> interface RichValueMapper<V, VR> {
>>>>>>>>
>>>>>>>> VR apply(final V value, final RecordContext context);
>>>>>>>>
>>>>>>>> }
>>>>>>>>
>>>>>>>> ...
>>>>>>>>
>>>>>>>> // then in KStreams
>>>>>>>>
>>>>>>>> <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR>
>>>>>>> mapper);
>>>>>>>> <VR> KStream<K, VR> mapValueswithContext(RichValueMapper <? super
>>> V, ?
>>>>>>>> extends VR> mapper);
>>>>>>>>
>>>>>>>> -------------------------------
>>>>>>>>
>>>>>>>> The caveat is that it will introduces more overloads; but I think
>>> the
>>>>>>>> #.overloads are mainly introduced by 1) serde overrides and 2)
>>>>>>>> state-store-supplier overides, both of which can be reduced in the
>>>> near
>>>>>>>> future, and I felt this overloading is still worthwhile, as it has
>>> the
>>>>>>>> following benefits:
>>>>>>>>
>>>>>>>> 1) still allow lambda expressions.
>>>>>>>> 2) clearer code path (do not need to "convert" from non-rich
>>> functions
>>>>>> to
>>>>>>>> rich functions)
>>>>>>>>
>>>>>>>>
>>>>>>>> Maybe this approach has already been discussed and I may have
>>>>>> overlooked
>>>>>>> in
>>>>>>>> the email thread; anyways, lmk.
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jun 1, 2017 at 10:18 PM, Matthias J. Sax <
>>>>>> matth...@confluent.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I agree with Jeyhun. As already mention, the overall API
>>> improvement
>>>>>>>>> ideas are overlapping and/or contradicting each other. For this
>>>>>> reason,
>>>>>>>>> not all ideas can be accomplished and some Jira might just be
>>> closed
>>>>>> as
>>>>>>>>> "won't fix".
>>>>>>>>>
>>>>>>>>> For this reason, we try to do those KIP discussion with are large
>>>>>> scope
>>>>>>>>> to get an overall picture to converge to an overall consisted
>> API.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> @Jeyhun: about the overloads. Yes, we might get more overload. It
>>>>>> might
>>>>>>>>> be sufficient though, to do a single xxxWithContext() overload
>> that
>>>>>> will
>>>>>>>>> provide key+value+context. Otherwise, if might get too messy
>> having
>>>>>>>>> ValueMapper, ValueMapperWithKey, ValueMapperWithContext,
>>>>>>>>> ValueMapperWithKeyWithContext.
>>>>>>>>>
>>>>>>>>> On the other hand, we also have the "builder pattern" idea as an
>>> API
>>>>>>>>> change and this might mitigate the overload problem. Not for
>> simple
>>>>>>>>> function like map/flatMap etc but for joins and aggregations.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On the other hand, as I mentioned in an older email, I am
>>> personally
>>>>>>>>> fine to break the pure functional interface, and add
>>>>>>>>>
>>>>>>>>>   - interface WithRecordContext with method `open(RecordContext)`
>>> (or
>>>>>>>>> `init(...)`, or any better name) -- but not `close()`)
>>>>>>>>>
>>>>>>>>>   - interface ValueMapperWithRecordContext extends ValueMapper,
>>>>>>>>> WithRecordContext
>>>>>>>>>
>>>>>>>>> This would allow us to avoid any overload. Of course, we don't
>> get
>>> a
>>>>>>>>> "pure function" interface and also sacrifices Lambdas.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I am personally a little bit undecided what the better option
>> might
>>>>>> be.
>>>>>>>>> Curious to hear what other think about this trade off.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 6/1/17 6:13 PM, Jeyhun Karimov wrote:
>>>>>>>>>> Hi Guozhang,
>>>>>>>>>>
>>>>>>>>>> It subsumes partially. Initially the idea was to support
>>>>>> RichFunctions
>>>>>>>>> as a
>>>>>>>>>> separate interface. Throughout the discussion, however, we
>>>> considered
>>>>>>>>> maybe
>>>>>>>>>> overloading the related methods (with RecodContext param) is
>>> better
>>>>>>>>>> approach than providing a separate RichFunction interface.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Jeyhun
>>>>>>>>>>
>>>>>>>>>> On Fri, Jun 2, 2017 at 2:27 AM Guozhang Wang <
>> wangg...@gmail.com>
>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Does this KIP subsume this ticket as well?
>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-4125
>>>>>>>>>>>
>>>>>>>>>>> On Sat, May 20, 2017 at 9:05 AM, Jeyhun Karimov <
>>>>>> je.kari...@gmail.com
>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Dear community,
>>>>>>>>>>>>
>>>>>>>>>>>> As we discussed in KIP-149 [DISCUSS] thread [1], I would like
>> to
>>>>>>>>> initiate
>>>>>>>>>>>> KIP for rich functions (interfaces) [2].
>>>>>>>>>>>> I would like to get your comments.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj=
>>>>>>>>>>>> Re+DISCUSS+KIP+149+Enabling+key+access+in+
>>>>>>>>> ValueTransformer+ValueMapper+
>>>>>>>>>>>> and+ValueJoiner
>>>>>>>>>>>> [2]
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>> --
>>>>>>>>>>>> -Cheers
>>>>>>>>>>>>
>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>> -Cheers
>>>>>>
>>>>>> Jeyhun
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>> -Cheers
>>>
>>> Jeyhun
>>>
>>
>>
>>
>> --
>> -- Guozhang
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to