Hi Damian, Thanks for comments. About overrides, what other alternatives do we have? For backwards-compatibility we have to add extra methods to the existing ones.
About ProcessorContext vs RecordContext, you are right. I think I need to implement a prototype to understand the full picture as some parts of the KIP might not be as straightforward as I thought. Cheers, Jeyhun On Wed, Jul 5, 2017 at 10:40 AM Damian Guy <damian....@gmail.com> wrote: > HI Jeyhun, > > Is the intention that these methods are new overloads on the KStream, > KTable, etc? > > It is worth noting that a ProcessorContext is not a RecordContext. A > RecordContext, as it stands, only exists during the processing of a single > record. Whereas the ProcessorContext exists for the lifetime of the > Processor. Sot it doesn't make sense to cast a ProcessorContext to a > RecordContext. > You mentioned above passing the InternalProcessorContext to the init() > calls. It is internal for a reason and i think it should remain that way. > It might be better to move the recordContext() method from > InternalProcessorContext to ProcessorContext. > > In the KIP you have an example showing: > richMapper.init((RecordContext) processorContext); > But the interface is: > public interface RichValueMapper<V, VR> { > VR apply(final V value, final RecordContext recordContext); > } > i.e., there is no init(...), besides as above this wouldn't make sense. > > Thanks, > Damian > > On Tue, 4 Jul 2017 at 23:30 Jeyhun Karimov <je.kari...@gmail.com> wrote: > > > Hi Matthias, > > > > Actually my intend was to provide to RichInitializer and later on we > could > > provide the context of the record as you also mentioned. > > I remove that not to confuse the users. > > Regarding the RecordContext and ProcessorContext interfaces, I just > > realized the InternalProcessorContext class. Can't we pass this as a > > parameter to init() method of processors? Then we would be able to get > > RecordContext easily with just a method call. > > > > > > Cheers, > > Jeyhun > > > > On Thu, Jun 29, 2017 at 10:14 PM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > One more thing: > > > > > > I don't think `RichInitializer` does make sense. As we don't have any > > > input record, there is also no context. We could of course provide the > > > context of the record that triggers the init call, but this seems to be > > > semantically questionable. Also, the context for this first record will > > > be provided by the consecutive call to aggregate anyways. > > > > > > > > > -Matthias > > > > > > On 6/29/17 1:11 PM, Matthias J. Sax wrote: > > > > Thanks for updating the KIP. > > > > > > > > I have one concern with regard to backward compatibility. You suggest > > to > > > > use RecrodContext as base interface for ProcessorContext. This will > > > > break compatibility. > > > > > > > > I think, we should just have two independent interfaces. Our own > > > > ProcessorContextImpl class would implement both. This allows us to > cast > > > > it to `RecordContext` and thus limit the visible scope. > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > On 6/27/17 1:35 PM, Jeyhun Karimov wrote: > > > >> Hi all, > > > >> > > > >> I updated the KIP w.r.t. discussion and comments. > > > >> Basically I eliminated overloads for particular method if they are > > more > > > >> than 3. > > > >> As we can see there are a lot of overloads (and more will come with > > > KIP-149 > > > >> :) ) > > > >> So, is it wise to > > > >> wait the result of constructive DSL thread or > > > >> extend KIP to address this issue as well or > > > >> continue as it is? > > > >> > > > >> Cheers, > > > >> Jeyhun > > > >> > > > >> On Wed, Jun 14, 2017 at 11:29 PM Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > >> > > > >>> LGTM. Thanks! > > > >>> > > > >>> > > > >>> Guozhang > > > >>> > > > >>> On Tue, Jun 13, 2017 at 2:20 PM, Jeyhun Karimov < > > je.kari...@gmail.com> > > > >>> wrote: > > > >>> > > > >>>> Thanks for the comment Matthias. After all the discussion (thanks > to > > > all > > > >>>> participants), I think this (single method that passes in a > > > RecordContext > > > >>>> object) is the best alternative. > > > >>>> Just a side note: I think KAFKA-3907 [1] can also be integrated > into > > > the > > > >>>> KIP by adding related method inside RecordContext interface. > > > >>>> > > > >>>> > > > >>>> [1] https://issues.apache.org/jira/browse/KAFKA-3907 > > > >>>> > > > >>>> > > > >>>> Cheers, > > > >>>> Jeyhun > > > >>>> > > > >>>> On Tue, Jun 13, 2017 at 7:50 PM Matthias J. Sax < > > > matth...@confluent.io> > > > >>>> wrote: > > > >>>> > > > >>>>> Hi, > > > >>>>> > > > >>>>> I would like to push this discussion further. It seems we got > nice > > > >>>>> alternatives (thanks for the summary Jeyhun!). > > > >>>>> > > > >>>>> With respect to RichFunctions and allowing them to be stateful, I > > > have > > > >>>>> my doubt as expressed already. From my understanding, the idea > was > > to > > > >>>>> give access to record metadata information only. If you want to > do > > a > > > >>>>> stateful computation you should rather use #transform(). > > > >>>>> > > > >>>>> Furthermore, as pointed out, we would need to switch to a > > > >>>>> supplier-pattern introducing many more overloads. > > > >>>>> > > > >>>>> For those reason, I advocate for a simple interface with a single > > > >>> method > > > >>>>> that passes in a RecordContext object. > > > >>>>> > > > >>>>> > > > >>>>> -Matthias > > > >>>>> > > > >>>>> > > > >>>>> On 6/6/17 5:15 PM, Guozhang Wang wrote: > > > >>>>>> Thanks for the comprehensive summary! > > > >>>>>> > > > >>>>>> Personally I'd prefer the option of passing RecordContext as an > > > >>>>> additional > > > >>>>>> parameter into he overloaded function. But I'm also open to > other > > > >>>>> arguments > > > >>>>>> if there are sth. that I have overlooked. > > > >>>>>> > > > >>>>>> Guozhang > > > >>>>>> > > > >>>>>> > > > >>>>>> On Mon, Jun 5, 2017 at 3:19 PM, Jeyhun Karimov < > > > je.kari...@gmail.com > > > >>>> > > > >>>>> wrote: > > > >>>>>> > > > >>>>>>> Hi, > > > >>>>>>> > > > >>>>>>> Thanks for your comments Matthias and Guozhang. > > > >>>>>>> > > > >>>>>>> Below I mention the quick summary of the main alternatives we > > > looked > > > >>>> at > > > >>>>> to > > > >>>>>>> introduce the Rich functions (I will refer to it as Rich > > functions > > > >>>>> until we > > > >>>>>>> find better/another name). Initially the proposed alternatives > > was > > > >>> not > > > >>>>>>> backwards-compatible, so I will not mention them. > > > >>>>>>> The related discussions are spread in KIP-149 and in this KIP > > > >>>> (KIP-159) > > > >>>>>>> discussion threads. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> 1. The idea of rich functions came into the stage with KIP-149, > > in > > > >>>>>>> discussion thread. As a result we extended KIP-149 to support > > Rich > > > >>>>>>> functions as well. > > > >>>>>>> > > > >>>>>>> 2. To as part of the Rich functions, we provided init > > > >>>>> (ProcessorContext) > > > >>>>>>> method. Afterwards, Dammian suggested that we should not > provide > > > >>>>>>> ProcessorContext to users. As a result, we separated the two > > > >>> problems > > > >>>>> into > > > >>>>>>> two separate KIPs, as it seems they can be solved in parallel. > > > >>>>>>> > > > >>>>>>> - One approach we considered was : > > > >>>>>>> > > > >>>>>>> public interface ValueMapperWithKey<K, V, VR> { > > > >>>>>>> VR apply(final K key, final V value); > > > >>>>>>> } > > > >>>>>>> > > > >>>>>>> public interface RichValueMapper<K, V, VR> extends > RichFunction{ > > > >>>>>>> } > > > >>>>>>> > > > >>>>>>> public interface RichFunction { > > > >>>>>>> void init(RecordContext recordContext); > > > >>>>>>> void close(); > > > >>>>>>> } > > > >>>>>>> > > > >>>>>>> public interface RecordContext { > > > >>>>>>> String applicationId(); > > > >>>>>>> TaskId taskId(); > > > >>>>>>> StreamsMetrics metrics(); > > > >>>>>>> String topic(); > > > >>>>>>> int partition(); > > > >>>>>>> long offset(); > > > >>>>>>> long timestamp(); > > > >>>>>>> Map<String, Object> appConfigs(); > > > >>>>>>> Map<String, Object> appConfigsWithPrefix(String prefix); > > > >>>>>>> } > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> public interface ProcessorContext extends RecordContext { > > > >>>>>>> // all methods but the ones in RecordContext > > > >>>>>>> } > > > >>>>>>> > > > >>>>>>> As a result: > > > >>>>>>> * . All "withKey" and "withoutKey" interfaces can be converted > to > > > >>>> their > > > >>>>>>> Rich counterparts (with empty init() and close() methods) > > > >>>>>>> *. All related Processors will accept Rich interfaces in their > > > >>>>>>> constructors. > > > >>>>>>> *. So, we convert the related "withKey" or "withoutKey" > > interfaces > > > >>> to > > > >>>>> Rich > > > >>>>>>> interface while building the topology and initialize the > related > > > >>>>> processors > > > >>>>>>> with Rich interfaces only. > > > >>>>>>> *. We will not need to overloaded methods for rich functions as > > > Rich > > > >>>>>>> interfaces extend withKey interfaces. We will just check the > > object > > > >>>> type > > > >>>>>>> and act accordingly. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> 3. There was some thoughts that the above approach does not > > support > > > >>>>> lambdas > > > >>>>>>> so we should support only one method, only init(RecordContext), > > as > > > >>>> part > > > >>>>> of > > > >>>>>>> Rich interfaces. > > > >>>>>>> This is still in discussion. Personally I think Rich interfaces > > are > > > >>> by > > > >>>>>>> definition lambda-free and we should not care much about it. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> 4. Thanks to Matthias's discussion, an alternative we > considered > > > was > > > >>>> to > > > >>>>>>> pass in the RecordContext as method parameter. This might even > > > >>> allow > > > >>>> to > > > >>>>>>> use Lambdas and we could keep the name RichFunction as we > > preserve > > > >>> the > > > >>>>>>> nature of being a function. > > > >>>>>>> "If you go with `init()` and `close()` we basically > > > >>>>>>> allow users to have an in-memory state for a function. Thus, we > > > >>> cannot > > > >>>>>>> share a single instance of RichValueMapper (etc) over multiple > > > tasks > > > >>>> and > > > >>>>>>> we would need a supplier pattern similar to #transform(). And > > this > > > >>>> would > > > >>>>>>> "break the flow" of the API, as (Rich)ValueMapperSupplier would > > not > > > >>>>>>> inherit from ValueMapper and thus we would need many new > overload > > > >>> for > > > >>>>>>> KStream/KTable classes". (Copy paste from Matthias's email) > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> Cheers, > > > >>>>>>> Jeyhun > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Mon, Jun 5, 2017 at 5:18 AM Matthias J. Sax < > > > >>> matth...@confluent.io > > > >>>>> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>>> Yes, we did consider this, and there is no consensus yet what > > the > > > >>>> best > > > >>>>>>>> alternative is. > > > >>>>>>>> > > > >>>>>>>> @Jeyhun: the email thread got pretty long. Maybe you can give > a > > > >>> quick > > > >>>>>>>> summary of the current state of the discussion? > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> -Matthias > > > >>>>>>>> > > > >>>>>>>> On 6/4/17 6:04 PM, Guozhang Wang wrote: > > > >>>>>>>>> Thanks for the explanation Jeyhun and Matthias. > > > >>>>>>>>> > > > >>>>>>>>> I have just read through both KIP-149 and KIP-159 and am > > > wondering > > > >>>> if > > > >>>>>>> you > > > >>>>>>>>> guys have considered a slight different approach for rich > > > >>> function, > > > >>>>>>> that > > > >>>>>>>> is > > > >>>>>>>>> to add the `RecordContext` into the apply functions as an > > > >>> additional > > > >>>>>>>>> parameter. For example: > > > >>>>>>>>> > > > >>>>>>>>> --------------------------- > > > >>>>>>>>> > > > >>>>>>>>> interface RichValueMapper<V, VR> { > > > >>>>>>>>> > > > >>>>>>>>> VR apply(final V value, final RecordContext context); > > > >>>>>>>>> > > > >>>>>>>>> } > > > >>>>>>>>> > > > >>>>>>>>> ... > > > >>>>>>>>> > > > >>>>>>>>> // then in KStreams > > > >>>>>>>>> > > > >>>>>>>>> <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? > extends > > > VR> > > > >>>>>>>> mapper); > > > >>>>>>>>> <VR> KStream<K, VR> mapValueswithContext(RichValueMapper <? > > super > > > >>>> V, ? > > > >>>>>>>>> extends VR> mapper); > > > >>>>>>>>> > > > >>>>>>>>> ------------------------------- > > > >>>>>>>>> > > > >>>>>>>>> The caveat is that it will introduces more overloads; but I > > think > > > >>>> the > > > >>>>>>>>> #.overloads are mainly introduced by 1) serde overrides and > 2) > > > >>>>>>>>> state-store-supplier overides, both of which can be reduced > in > > > the > > > >>>>> near > > > >>>>>>>>> future, and I felt this overloading is still worthwhile, as > it > > > has > > > >>>> the > > > >>>>>>>>> following benefits: > > > >>>>>>>>> > > > >>>>>>>>> 1) still allow lambda expressions. > > > >>>>>>>>> 2) clearer code path (do not need to "convert" from non-rich > > > >>>> functions > > > >>>>>>> to > > > >>>>>>>>> rich functions) > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> Maybe this approach has already been discussed and I may have > > > >>>>>>> overlooked > > > >>>>>>>> in > > > >>>>>>>>> the email thread; anyways, lmk. > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> Guozhang > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> On Thu, Jun 1, 2017 at 10:18 PM, Matthias J. Sax < > > > >>>>>>> matth...@confluent.io> > > > >>>>>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>>> I agree with Jeyhun. As already mention, the overall API > > > >>>> improvement > > > >>>>>>>>>> ideas are overlapping and/or contradicting each other. For > > this > > > >>>>>>> reason, > > > >>>>>>>>>> not all ideas can be accomplished and some Jira might just > be > > > >>>> closed > > > >>>>>>> as > > > >>>>>>>>>> "won't fix". > > > >>>>>>>>>> > > > >>>>>>>>>> For this reason, we try to do those KIP discussion with are > > > large > > > >>>>>>> scope > > > >>>>>>>>>> to get an overall picture to converge to an overall > consisted > > > >>> API. > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> @Jeyhun: about the overloads. Yes, we might get more > overload. > > > It > > > >>>>>>> might > > > >>>>>>>>>> be sufficient though, to do a single xxxWithContext() > overload > > > >>> that > > > >>>>>>> will > > > >>>>>>>>>> provide key+value+context. Otherwise, if might get too messy > > > >>> having > > > >>>>>>>>>> ValueMapper, ValueMapperWithKey, ValueMapperWithContext, > > > >>>>>>>>>> ValueMapperWithKeyWithContext. > > > >>>>>>>>>> > > > >>>>>>>>>> On the other hand, we also have the "builder pattern" idea > as > > an > > > >>>> API > > > >>>>>>>>>> change and this might mitigate the overload problem. Not for > > > >>> simple > > > >>>>>>>>>> function like map/flatMap etc but for joins and > aggregations. > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> On the other hand, as I mentioned in an older email, I am > > > >>>> personally > > > >>>>>>>>>> fine to break the pure functional interface, and add > > > >>>>>>>>>> > > > >>>>>>>>>> - interface WithRecordContext with method > > > `open(RecordContext)` > > > >>>> (or > > > >>>>>>>>>> `init(...)`, or any better name) -- but not `close()`) > > > >>>>>>>>>> > > > >>>>>>>>>> - interface ValueMapperWithRecordContext extends > > ValueMapper, > > > >>>>>>>>>> WithRecordContext > > > >>>>>>>>>> > > > >>>>>>>>>> This would allow us to avoid any overload. Of course, we > don't > > > >>> get > > > >>>> a > > > >>>>>>>>>> "pure function" interface and also sacrifices Lambdas. > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> I am personally a little bit undecided what the better > option > > > >>> might > > > >>>>>>> be. > > > >>>>>>>>>> Curious to hear what other think about this trade off. > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> -Matthias > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> On 6/1/17 6:13 PM, Jeyhun Karimov wrote: > > > >>>>>>>>>>> Hi Guozhang, > > > >>>>>>>>>>> > > > >>>>>>>>>>> It subsumes partially. Initially the idea was to support > > > >>>>>>> RichFunctions > > > >>>>>>>>>> as a > > > >>>>>>>>>>> separate interface. Throughout the discussion, however, we > > > >>>>> considered > > > >>>>>>>>>> maybe > > > >>>>>>>>>>> overloading the related methods (with RecodContext param) > is > > > >>>> better > > > >>>>>>>>>>> approach than providing a separate RichFunction interface. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Cheers, > > > >>>>>>>>>>> Jeyhun > > > >>>>>>>>>>> > > > >>>>>>>>>>> On Fri, Jun 2, 2017 at 2:27 AM Guozhang Wang < > > > >>> wangg...@gmail.com> > > > >>>>>>>> wrote: > > > >>>>>>>>>>> > > > >>>>>>>>>>>> Does this KIP subsume this ticket as well? > > > >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-4125 > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On Sat, May 20, 2017 at 9:05 AM, Jeyhun Karimov < > > > >>>>>>> je.kari...@gmail.com > > > >>>>>>>>> > > > >>>>>>>>>>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> Dear community, > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> As we discussed in KIP-149 [DISCUSS] thread [1], I would > > like > > > >>> to > > > >>>>>>>>>> initiate > > > >>>>>>>>>>>>> KIP for rich functions (interfaces) [2]. > > > >>>>>>>>>>>>> I would like to get your comments. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> [1] > > > >>>>>>>>>>>>> > http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj= > > > >>>>>>>>>>>>> Re+DISCUSS+KIP+149+Enabling+key+access+in+ > > > >>>>>>>>>> ValueTransformer+ValueMapper+ > > > >>>>>>>>>>>>> and+ValueJoiner > > > >>>>>>>>>>>>> [2] > > > >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > >>>>>>>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Cheers, > > > >>>>>>>>>>>>> Jeyhun > > > >>>>>>>>>>>>> -- > > > >>>>>>>>>>>>> -Cheers > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Jeyhun > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> -- > > > >>>>>>>>>>>> -- Guozhang > > > >>>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>>> -- > > > >>>>>>> -Cheers > > > >>>>>>> > > > >>>>>>> Jeyhun > > > >>>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>> > > > >>>>> -- > > > >>>> -Cheers > > > >>>> > > > >>>> Jeyhun > > > >>>> > > > >>> > > > >>> > > > >>> > > > >>> -- > > > >>> -- Guozhang > > > >>> > > > > > > > > > > -- > > -Cheers > > > > Jeyhun > > > -- -Cheers Jeyhun