Hi,

I would like to push this discussion further. It seems we got nice
alternatives (thanks for the summary Jeyhun!).

With respect to RichFunctions and allowing them to be stateful, I have
my doubt as expressed already. From my understanding, the idea was to
give access to record metadata information only. If you want to do a
stateful computation you should rather use #transform().

Furthermore, as pointed out, we would need to switch to a
supplier-pattern introducing many more overloads.

For those reason, I advocate for a simple interface with a single method
that passes in a RecordContext object.


-Matthias


On 6/6/17 5:15 PM, Guozhang Wang wrote:
> Thanks for the comprehensive summary!
> 
> Personally I'd prefer the option of passing RecordContext as an additional
> parameter into he overloaded function. But I'm also open to other arguments
> if there are sth. that I have overlooked.
> 
> Guozhang
> 
> 
> On Mon, Jun 5, 2017 at 3:19 PM, Jeyhun Karimov <je.kari...@gmail.com> wrote:
> 
>> Hi,
>>
>> Thanks for your comments Matthias and Guozhang.
>>
>> Below I mention the quick summary of the main alternatives we looked at to
>> introduce the Rich functions (I will refer to it as Rich functions until we
>> find better/another name). Initially the proposed alternatives was not
>> backwards-compatible, so I will not mention them.
>> The related discussions are spread in KIP-149 and in this KIP (KIP-159)
>> discussion threads.
>>
>>
>>
>> 1. The idea of rich functions came into the stage with KIP-149, in
>> discussion thread. As a result we extended KIP-149 to support Rich
>> functions as well.
>>
>> 2.  To as part of the Rich functions, we provided init (ProcessorContext)
>> method. Afterwards, Dammian suggested that we should not provide
>> ProcessorContext to users. As a result, we separated the two problems into
>> two separate KIPs, as it seems they can be solved in parallel.
>>
>> - One approach we considered was :
>>
>> public interface ValueMapperWithKey<K, V, VR> {
>>     VR apply(final K key, final V value);
>> }
>>
>> public interface RichValueMapper<K, V, VR> extends RichFunction{
>> }
>>
>> public interface RichFunction {
>>     void init(RecordContext recordContext);
>>     void close();
>> }
>>
>> public interface RecordContext {
>>     String applicationId();
>>     TaskId taskId();
>>     StreamsMetrics metrics();
>>     String topic();
>>     int partition();
>>     long offset();
>>     long timestamp();
>>     Map<String, Object> appConfigs();
>>     Map<String, Object> appConfigsWithPrefix(String prefix);
>> }
>>
>>
>> public interface ProcessorContext extends RecordContext {
>>    // all methods but the ones in RecordContext
>> }
>>
>> As a result:
>> * . All "withKey" and "withoutKey" interfaces can be converted to their
>> Rich counterparts (with empty init() and close() methods)
>> *. All related Processors will accept Rich interfaces in their
>> constructors.
>> *. So, we convert the related "withKey" or "withoutKey" interfaces to Rich
>> interface while building the topology and initialize the related processors
>> with Rich interfaces only.
>> *. We will not need to overloaded methods for rich functions as Rich
>> interfaces extend withKey interfaces. We will just check the object type
>> and act accordingly.
>>
>>
>>
>>
>> 3. There was some thoughts that the above approach does not support lambdas
>> so we should support only one method, only init(RecordContext), as part of
>> Rich interfaces.
>> This is still in discussion. Personally I think Rich interfaces are by
>> definition lambda-free and we should not care much about it.
>>
>>
>> 4. Thanks to Matthias's discussion, an alternative we considered was to
>> pass in the RecordContext as method parameter.  This might even allow to
>> use Lambdas and we could keep the name RichFunction as we preserve the
>> nature of being a function.
>> "If you go with `init()` and `close()` we basically
>> allow users to have an in-memory state for a function. Thus, we cannot
>> share a single instance of RichValueMapper (etc) over multiple tasks and
>> we would need a supplier pattern similar to #transform(). And this would
>> "break the flow" of the API, as (Rich)ValueMapperSupplier would not
>> inherit from ValueMapper and thus we would need many new overload for
>> KStream/KTable classes". (Copy paste from Matthias's email)
>>
>>
>> Cheers,
>> Jeyhun
>>
>>
>> On Mon, Jun 5, 2017 at 5:18 AM Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>>> Yes, we did consider this, and there is no consensus yet what the best
>>> alternative is.
>>>
>>> @Jeyhun: the email thread got pretty long. Maybe you can give a quick
>>> summary of the current state of the discussion?
>>>
>>>
>>> -Matthias
>>>
>>> On 6/4/17 6:04 PM, Guozhang Wang wrote:
>>>> Thanks for the explanation Jeyhun and Matthias.
>>>>
>>>> I have just read through both KIP-149 and KIP-159 and am wondering if
>> you
>>>> guys have considered a slight different approach for rich function,
>> that
>>> is
>>>> to add the `RecordContext` into the apply functions as an additional
>>>> parameter. For example:
>>>>
>>>> ---------------------------
>>>>
>>>> interface RichValueMapper<V, VR> {
>>>>
>>>> VR apply(final V value, final RecordContext context);
>>>>
>>>> }
>>>>
>>>> ...
>>>>
>>>> // then in KStreams
>>>>
>>>> <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR>
>>> mapper);
>>>> <VR> KStream<K, VR> mapValueswithContext(RichValueMapper <? super V, ?
>>>> extends VR> mapper);
>>>>
>>>> -------------------------------
>>>>
>>>> The caveat is that it will introduces more overloads; but I think the
>>>> #.overloads are mainly introduced by 1) serde overrides and 2)
>>>> state-store-supplier overides, both of which can be reduced in the near
>>>> future, and I felt this overloading is still worthwhile, as it has the
>>>> following benefits:
>>>>
>>>> 1) still allow lambda expressions.
>>>> 2) clearer code path (do not need to "convert" from non-rich functions
>> to
>>>> rich functions)
>>>>
>>>>
>>>> Maybe this approach has already been discussed and I may have
>> overlooked
>>> in
>>>> the email thread; anyways, lmk.
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>>
>>>> On Thu, Jun 1, 2017 at 10:18 PM, Matthias J. Sax <
>> matth...@confluent.io>
>>>> wrote:
>>>>
>>>>> I agree with Jeyhun. As already mention, the overall API improvement
>>>>> ideas are overlapping and/or contradicting each other. For this
>> reason,
>>>>> not all ideas can be accomplished and some Jira might just be closed
>> as
>>>>> "won't fix".
>>>>>
>>>>> For this reason, we try to do those KIP discussion with are large
>> scope
>>>>> to get an overall picture to converge to an overall consisted API.
>>>>>
>>>>>
>>>>> @Jeyhun: about the overloads. Yes, we might get more overload. It
>> might
>>>>> be sufficient though, to do a single xxxWithContext() overload that
>> will
>>>>> provide key+value+context. Otherwise, if might get too messy having
>>>>> ValueMapper, ValueMapperWithKey, ValueMapperWithContext,
>>>>> ValueMapperWithKeyWithContext.
>>>>>
>>>>> On the other hand, we also have the "builder pattern" idea as an API
>>>>> change and this might mitigate the overload problem. Not for simple
>>>>> function like map/flatMap etc but for joins and aggregations.
>>>>>
>>>>>
>>>>> On the other hand, as I mentioned in an older email, I am personally
>>>>> fine to break the pure functional interface, and add
>>>>>
>>>>>   - interface WithRecordContext with method `open(RecordContext)` (or
>>>>> `init(...)`, or any better name) -- but not `close()`)
>>>>>
>>>>>   - interface ValueMapperWithRecordContext extends ValueMapper,
>>>>> WithRecordContext
>>>>>
>>>>> This would allow us to avoid any overload. Of course, we don't get a
>>>>> "pure function" interface and also sacrifices Lambdas.
>>>>>
>>>>>
>>>>>
>>>>> I am personally a little bit undecided what the better option might
>> be.
>>>>> Curious to hear what other think about this trade off.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 6/1/17 6:13 PM, Jeyhun Karimov wrote:
>>>>>> Hi Guozhang,
>>>>>>
>>>>>> It subsumes partially. Initially the idea was to support
>> RichFunctions
>>>>> as a
>>>>>> separate interface. Throughout the discussion, however, we considered
>>>>> maybe
>>>>>> overloading the related methods (with RecodContext param) is better
>>>>>> approach than providing a separate RichFunction interface.
>>>>>>
>>>>>> Cheers,
>>>>>> Jeyhun
>>>>>>
>>>>>> On Fri, Jun 2, 2017 at 2:27 AM Guozhang Wang <wangg...@gmail.com>
>>> wrote:
>>>>>>
>>>>>>> Does this KIP subsume this ticket as well?
>>>>>>> https://issues.apache.org/jira/browse/KAFKA-4125
>>>>>>>
>>>>>>> On Sat, May 20, 2017 at 9:05 AM, Jeyhun Karimov <
>> je.kari...@gmail.com
>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Dear community,
>>>>>>>>
>>>>>>>> As we discussed in KIP-149 [DISCUSS] thread [1], I would like to
>>>>> initiate
>>>>>>>> KIP for rich functions (interfaces) [2].
>>>>>>>> I would like to get your comments.
>>>>>>>>
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj=
>>>>>>>> Re+DISCUSS+KIP+149+Enabling+key+access+in+
>>>>> ValueTransformer+ValueMapper+
>>>>>>>> and+ValueJoiner
>>>>>>>> [2]
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Jeyhun
>>>>>>>> --
>>>>>>>> -Cheers
>>>>>>>>
>>>>>>>> Jeyhun
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>> --
>> -Cheers
>>
>> Jeyhun
>>
> 
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to