HI Jeyhun, Is the intention that these methods are new overloads on the KStream, KTable, etc?
It is worth noting that a ProcessorContext is not a RecordContext. A RecordContext, as it stands, only exists during the processing of a single record. Whereas the ProcessorContext exists for the lifetime of the Processor. Sot it doesn't make sense to cast a ProcessorContext to a RecordContext. You mentioned above passing the InternalProcessorContext to the init() calls. It is internal for a reason and i think it should remain that way. It might be better to move the recordContext() method from InternalProcessorContext to ProcessorContext. In the KIP you have an example showing: richMapper.init((RecordContext) processorContext); But the interface is: public interface RichValueMapper<V, VR> { VR apply(final V value, final RecordContext recordContext); } i.e., there is no init(...), besides as above this wouldn't make sense. Thanks, Damian On Tue, 4 Jul 2017 at 23:30 Jeyhun Karimov <je.kari...@gmail.com> wrote: > Hi Matthias, > > Actually my intend was to provide to RichInitializer and later on we could > provide the context of the record as you also mentioned. > I remove that not to confuse the users. > Regarding the RecordContext and ProcessorContext interfaces, I just > realized the InternalProcessorContext class. Can't we pass this as a > parameter to init() method of processors? Then we would be able to get > RecordContext easily with just a method call. > > > Cheers, > Jeyhun > > On Thu, Jun 29, 2017 at 10:14 PM Matthias J. Sax <matth...@confluent.io> > wrote: > > > One more thing: > > > > I don't think `RichInitializer` does make sense. As we don't have any > > input record, there is also no context. We could of course provide the > > context of the record that triggers the init call, but this seems to be > > semantically questionable. Also, the context for this first record will > > be provided by the consecutive call to aggregate anyways. > > > > > > -Matthias > > > > On 6/29/17 1:11 PM, Matthias J. Sax wrote: > > > Thanks for updating the KIP. > > > > > > I have one concern with regard to backward compatibility. You suggest > to > > > use RecrodContext as base interface for ProcessorContext. This will > > > break compatibility. > > > > > > I think, we should just have two independent interfaces. Our own > > > ProcessorContextImpl class would implement both. This allows us to cast > > > it to `RecordContext` and thus limit the visible scope. > > > > > > > > > -Matthias > > > > > > > > > > > > On 6/27/17 1:35 PM, Jeyhun Karimov wrote: > > >> Hi all, > > >> > > >> I updated the KIP w.r.t. discussion and comments. > > >> Basically I eliminated overloads for particular method if they are > more > > >> than 3. > > >> As we can see there are a lot of overloads (and more will come with > > KIP-149 > > >> :) ) > > >> So, is it wise to > > >> wait the result of constructive DSL thread or > > >> extend KIP to address this issue as well or > > >> continue as it is? > > >> > > >> Cheers, > > >> Jeyhun > > >> > > >> On Wed, Jun 14, 2017 at 11:29 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > >> > > >>> LGTM. Thanks! > > >>> > > >>> > > >>> Guozhang > > >>> > > >>> On Tue, Jun 13, 2017 at 2:20 PM, Jeyhun Karimov < > je.kari...@gmail.com> > > >>> wrote: > > >>> > > >>>> Thanks for the comment Matthias. After all the discussion (thanks to > > all > > >>>> participants), I think this (single method that passes in a > > RecordContext > > >>>> object) is the best alternative. > > >>>> Just a side note: I think KAFKA-3907 [1] can also be integrated into > > the > > >>>> KIP by adding related method inside RecordContext interface. > > >>>> > > >>>> > > >>>> [1] https://issues.apache.org/jira/browse/KAFKA-3907 > > >>>> > > >>>> > > >>>> Cheers, > > >>>> Jeyhun > > >>>> > > >>>> On Tue, Jun 13, 2017 at 7:50 PM Matthias J. Sax < > > matth...@confluent.io> > > >>>> wrote: > > >>>> > > >>>>> Hi, > > >>>>> > > >>>>> I would like to push this discussion further. It seems we got nice > > >>>>> alternatives (thanks for the summary Jeyhun!). > > >>>>> > > >>>>> With respect to RichFunctions and allowing them to be stateful, I > > have > > >>>>> my doubt as expressed already. From my understanding, the idea was > to > > >>>>> give access to record metadata information only. If you want to do > a > > >>>>> stateful computation you should rather use #transform(). > > >>>>> > > >>>>> Furthermore, as pointed out, we would need to switch to a > > >>>>> supplier-pattern introducing many more overloads. > > >>>>> > > >>>>> For those reason, I advocate for a simple interface with a single > > >>> method > > >>>>> that passes in a RecordContext object. > > >>>>> > > >>>>> > > >>>>> -Matthias > > >>>>> > > >>>>> > > >>>>> On 6/6/17 5:15 PM, Guozhang Wang wrote: > > >>>>>> Thanks for the comprehensive summary! > > >>>>>> > > >>>>>> Personally I'd prefer the option of passing RecordContext as an > > >>>>> additional > > >>>>>> parameter into he overloaded function. But I'm also open to other > > >>>>> arguments > > >>>>>> if there are sth. that I have overlooked. > > >>>>>> > > >>>>>> Guozhang > > >>>>>> > > >>>>>> > > >>>>>> On Mon, Jun 5, 2017 at 3:19 PM, Jeyhun Karimov < > > je.kari...@gmail.com > > >>>> > > >>>>> wrote: > > >>>>>> > > >>>>>>> Hi, > > >>>>>>> > > >>>>>>> Thanks for your comments Matthias and Guozhang. > > >>>>>>> > > >>>>>>> Below I mention the quick summary of the main alternatives we > > looked > > >>>> at > > >>>>> to > > >>>>>>> introduce the Rich functions (I will refer to it as Rich > functions > > >>>>> until we > > >>>>>>> find better/another name). Initially the proposed alternatives > was > > >>> not > > >>>>>>> backwards-compatible, so I will not mention them. > > >>>>>>> The related discussions are spread in KIP-149 and in this KIP > > >>>> (KIP-159) > > >>>>>>> discussion threads. > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> 1. The idea of rich functions came into the stage with KIP-149, > in > > >>>>>>> discussion thread. As a result we extended KIP-149 to support > Rich > > >>>>>>> functions as well. > > >>>>>>> > > >>>>>>> 2. To as part of the Rich functions, we provided init > > >>>>> (ProcessorContext) > > >>>>>>> method. Afterwards, Dammian suggested that we should not provide > > >>>>>>> ProcessorContext to users. As a result, we separated the two > > >>> problems > > >>>>> into > > >>>>>>> two separate KIPs, as it seems they can be solved in parallel. > > >>>>>>> > > >>>>>>> - One approach we considered was : > > >>>>>>> > > >>>>>>> public interface ValueMapperWithKey<K, V, VR> { > > >>>>>>> VR apply(final K key, final V value); > > >>>>>>> } > > >>>>>>> > > >>>>>>> public interface RichValueMapper<K, V, VR> extends RichFunction{ > > >>>>>>> } > > >>>>>>> > > >>>>>>> public interface RichFunction { > > >>>>>>> void init(RecordContext recordContext); > > >>>>>>> void close(); > > >>>>>>> } > > >>>>>>> > > >>>>>>> public interface RecordContext { > > >>>>>>> String applicationId(); > > >>>>>>> TaskId taskId(); > > >>>>>>> StreamsMetrics metrics(); > > >>>>>>> String topic(); > > >>>>>>> int partition(); > > >>>>>>> long offset(); > > >>>>>>> long timestamp(); > > >>>>>>> Map<String, Object> appConfigs(); > > >>>>>>> Map<String, Object> appConfigsWithPrefix(String prefix); > > >>>>>>> } > > >>>>>>> > > >>>>>>> > > >>>>>>> public interface ProcessorContext extends RecordContext { > > >>>>>>> // all methods but the ones in RecordContext > > >>>>>>> } > > >>>>>>> > > >>>>>>> As a result: > > >>>>>>> * . All "withKey" and "withoutKey" interfaces can be converted to > > >>>> their > > >>>>>>> Rich counterparts (with empty init() and close() methods) > > >>>>>>> *. All related Processors will accept Rich interfaces in their > > >>>>>>> constructors. > > >>>>>>> *. So, we convert the related "withKey" or "withoutKey" > interfaces > > >>> to > > >>>>> Rich > > >>>>>>> interface while building the topology and initialize the related > > >>>>> processors > > >>>>>>> with Rich interfaces only. > > >>>>>>> *. We will not need to overloaded methods for rich functions as > > Rich > > >>>>>>> interfaces extend withKey interfaces. We will just check the > object > > >>>> type > > >>>>>>> and act accordingly. > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> 3. There was some thoughts that the above approach does not > support > > >>>>> lambdas > > >>>>>>> so we should support only one method, only init(RecordContext), > as > > >>>> part > > >>>>> of > > >>>>>>> Rich interfaces. > > >>>>>>> This is still in discussion. Personally I think Rich interfaces > are > > >>> by > > >>>>>>> definition lambda-free and we should not care much about it. > > >>>>>>> > > >>>>>>> > > >>>>>>> 4. Thanks to Matthias's discussion, an alternative we considered > > was > > >>>> to > > >>>>>>> pass in the RecordContext as method parameter. This might even > > >>> allow > > >>>> to > > >>>>>>> use Lambdas and we could keep the name RichFunction as we > preserve > > >>> the > > >>>>>>> nature of being a function. > > >>>>>>> "If you go with `init()` and `close()` we basically > > >>>>>>> allow users to have an in-memory state for a function. Thus, we > > >>> cannot > > >>>>>>> share a single instance of RichValueMapper (etc) over multiple > > tasks > > >>>> and > > >>>>>>> we would need a supplier pattern similar to #transform(). And > this > > >>>> would > > >>>>>>> "break the flow" of the API, as (Rich)ValueMapperSupplier would > not > > >>>>>>> inherit from ValueMapper and thus we would need many new overload > > >>> for > > >>>>>>> KStream/KTable classes". (Copy paste from Matthias's email) > > >>>>>>> > > >>>>>>> > > >>>>>>> Cheers, > > >>>>>>> Jeyhun > > >>>>>>> > > >>>>>>> > > >>>>>>> On Mon, Jun 5, 2017 at 5:18 AM Matthias J. Sax < > > >>> matth...@confluent.io > > >>>>> > > >>>>>>> wrote: > > >>>>>>> > > >>>>>>>> Yes, we did consider this, and there is no consensus yet what > the > > >>>> best > > >>>>>>>> alternative is. > > >>>>>>>> > > >>>>>>>> @Jeyhun: the email thread got pretty long. Maybe you can give a > > >>> quick > > >>>>>>>> summary of the current state of the discussion? > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> -Matthias > > >>>>>>>> > > >>>>>>>> On 6/4/17 6:04 PM, Guozhang Wang wrote: > > >>>>>>>>> Thanks for the explanation Jeyhun and Matthias. > > >>>>>>>>> > > >>>>>>>>> I have just read through both KIP-149 and KIP-159 and am > > wondering > > >>>> if > > >>>>>>> you > > >>>>>>>>> guys have considered a slight different approach for rich > > >>> function, > > >>>>>>> that > > >>>>>>>> is > > >>>>>>>>> to add the `RecordContext` into the apply functions as an > > >>> additional > > >>>>>>>>> parameter. For example: > > >>>>>>>>> > > >>>>>>>>> --------------------------- > > >>>>>>>>> > > >>>>>>>>> interface RichValueMapper<V, VR> { > > >>>>>>>>> > > >>>>>>>>> VR apply(final V value, final RecordContext context); > > >>>>>>>>> > > >>>>>>>>> } > > >>>>>>>>> > > >>>>>>>>> ... > > >>>>>>>>> > > >>>>>>>>> // then in KStreams > > >>>>>>>>> > > >>>>>>>>> <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends > > VR> > > >>>>>>>> mapper); > > >>>>>>>>> <VR> KStream<K, VR> mapValueswithContext(RichValueMapper <? > super > > >>>> V, ? > > >>>>>>>>> extends VR> mapper); > > >>>>>>>>> > > >>>>>>>>> ------------------------------- > > >>>>>>>>> > > >>>>>>>>> The caveat is that it will introduces more overloads; but I > think > > >>>> the > > >>>>>>>>> #.overloads are mainly introduced by 1) serde overrides and 2) > > >>>>>>>>> state-store-supplier overides, both of which can be reduced in > > the > > >>>>> near > > >>>>>>>>> future, and I felt this overloading is still worthwhile, as it > > has > > >>>> the > > >>>>>>>>> following benefits: > > >>>>>>>>> > > >>>>>>>>> 1) still allow lambda expressions. > > >>>>>>>>> 2) clearer code path (do not need to "convert" from non-rich > > >>>> functions > > >>>>>>> to > > >>>>>>>>> rich functions) > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> Maybe this approach has already been discussed and I may have > > >>>>>>> overlooked > > >>>>>>>> in > > >>>>>>>>> the email thread; anyways, lmk. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> Guozhang > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On Thu, Jun 1, 2017 at 10:18 PM, Matthias J. Sax < > > >>>>>>> matth...@confluent.io> > > >>>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> I agree with Jeyhun. As already mention, the overall API > > >>>> improvement > > >>>>>>>>>> ideas are overlapping and/or contradicting each other. For > this > > >>>>>>> reason, > > >>>>>>>>>> not all ideas can be accomplished and some Jira might just be > > >>>> closed > > >>>>>>> as > > >>>>>>>>>> "won't fix". > > >>>>>>>>>> > > >>>>>>>>>> For this reason, we try to do those KIP discussion with are > > large > > >>>>>>> scope > > >>>>>>>>>> to get an overall picture to converge to an overall consisted > > >>> API. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> @Jeyhun: about the overloads. Yes, we might get more overload. > > It > > >>>>>>> might > > >>>>>>>>>> be sufficient though, to do a single xxxWithContext() overload > > >>> that > > >>>>>>> will > > >>>>>>>>>> provide key+value+context. Otherwise, if might get too messy > > >>> having > > >>>>>>>>>> ValueMapper, ValueMapperWithKey, ValueMapperWithContext, > > >>>>>>>>>> ValueMapperWithKeyWithContext. > > >>>>>>>>>> > > >>>>>>>>>> On the other hand, we also have the "builder pattern" idea as > an > > >>>> API > > >>>>>>>>>> change and this might mitigate the overload problem. Not for > > >>> simple > > >>>>>>>>>> function like map/flatMap etc but for joins and aggregations. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On the other hand, as I mentioned in an older email, I am > > >>>> personally > > >>>>>>>>>> fine to break the pure functional interface, and add > > >>>>>>>>>> > > >>>>>>>>>> - interface WithRecordContext with method > > `open(RecordContext)` > > >>>> (or > > >>>>>>>>>> `init(...)`, or any better name) -- but not `close()`) > > >>>>>>>>>> > > >>>>>>>>>> - interface ValueMapperWithRecordContext extends > ValueMapper, > > >>>>>>>>>> WithRecordContext > > >>>>>>>>>> > > >>>>>>>>>> This would allow us to avoid any overload. Of course, we don't > > >>> get > > >>>> a > > >>>>>>>>>> "pure function" interface and also sacrifices Lambdas. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> I am personally a little bit undecided what the better option > > >>> might > > >>>>>>> be. > > >>>>>>>>>> Curious to hear what other think about this trade off. > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> -Matthias > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On 6/1/17 6:13 PM, Jeyhun Karimov wrote: > > >>>>>>>>>>> Hi Guozhang, > > >>>>>>>>>>> > > >>>>>>>>>>> It subsumes partially. Initially the idea was to support > > >>>>>>> RichFunctions > > >>>>>>>>>> as a > > >>>>>>>>>>> separate interface. Throughout the discussion, however, we > > >>>>> considered > > >>>>>>>>>> maybe > > >>>>>>>>>>> overloading the related methods (with RecodContext param) is > > >>>> better > > >>>>>>>>>>> approach than providing a separate RichFunction interface. > > >>>>>>>>>>> > > >>>>>>>>>>> Cheers, > > >>>>>>>>>>> Jeyhun > > >>>>>>>>>>> > > >>>>>>>>>>> On Fri, Jun 2, 2017 at 2:27 AM Guozhang Wang < > > >>> wangg...@gmail.com> > > >>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>>> Does this KIP subsume this ticket as well? > > >>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-4125 > > >>>>>>>>>>>> > > >>>>>>>>>>>> On Sat, May 20, 2017 at 9:05 AM, Jeyhun Karimov < > > >>>>>>> je.kari...@gmail.com > > >>>>>>>>> > > >>>>>>>>>>>> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Dear community, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> As we discussed in KIP-149 [DISCUSS] thread [1], I would > like > > >>> to > > >>>>>>>>>> initiate > > >>>>>>>>>>>>> KIP for rich functions (interfaces) [2]. > > >>>>>>>>>>>>> I would like to get your comments. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> [1] > > >>>>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1PMjdk2CslH12?subj= > > >>>>>>>>>>>>> Re+DISCUSS+KIP+149+Enabling+key+access+in+ > > >>>>>>>>>> ValueTransformer+ValueMapper+ > > >>>>>>>>>>>>> and+ValueJoiner > > >>>>>>>>>>>>> [2] > > >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >>>>>>>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Cheers, > > >>>>>>>>>>>>> Jeyhun > > >>>>>>>>>>>>> -- > > >>>>>>>>>>>>> -Cheers > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Jeyhun > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> -- > > >>>>>>>>>>>> -- Guozhang > > >>>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> -- > > >>>>>>> -Cheers > > >>>>>>> > > >>>>>>> Jeyhun > > >>>>>>> > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>> > > >>>>> -- > > >>>> -Cheers > > >>>> > > >>>> Jeyhun > > >>>> > > >>> > > >>> > > >>> > > >>> -- > > >>> -- Guozhang > > >>> > > > > > > > -- > -Cheers > > Jeyhun >