Jan, Not sure I understand your argument that "we still going to present change.oldValue to the filter even though the record context() is for change.newValue". Are you referring to `KTableFilter#process()`? If yes could you point to me which LOC are you concerning about?
Guozhang On Mon, Nov 20, 2017 at 9:29 PM, Jan Filipiak <jan.filip...@trivago.com> wrote: > a remark of mine that got missed during migration: > > There is this problem that even though we have source.table.filter.join > the state-fullness happens at the table step not a the join step. In a > filter > we still going to present change.oldValue to the filter even though the > record context() is for change.newValue. I would go as far as applying > the filter before the table processor. Not to just get KIP-159, but because > I think its a side effect of a non ideal topology layout. If i can filter > 99% of my > records. my state could be way smaller. Also widely escalates the context > of the KIP > > I can only see upsides of executing the filter first. > > Best Jan > > > > On 20.11.2017 22:22, Matthias J. Sax wrote: > >> I am moving this back to the DISCUSS thread... Last 10 emails were sent >> to VOTE thread. >> >> Copying Guozhang's last summary below. Thanks for this summary. Very >> comprehensive! >> >> It seems, we all agree, that the current implementation of the context >> at PAPI level is ok, but we should not leak it into DSL. >> >> Thus, we can go with (2) or (3), were (3) is an extension to (2) >> carrying the context to more operators than just sources. It also seems, >> that we all agree, that many-to-one operations void the context. >> >> I still think, that just going with plain (2) is too restrictive -- but >> I am also fine if we don't go with the full proposal of (3). >> >> Also note, that the two operators filter() and filterNot() don't modify >> the record and thus for both, it would be absolutely valid to keep the >> context. >> >> I personally would keep the context for at least all one-to-one >> operators. One-to-many is debatable and I am fine to not carry the >> context further: at least the offset information is questionable for >> this case -- note thought, that semantically, the timestamp is inherited >> via one-to-many, and I also think this applies to "topic" and >> "partition". Thus, I think it's still valuable information we can carry >> downstreams. >> >> >> -Matthias >> >> Jan: which approach are you referring to as "the approach that is on the >>> table would be perfect"? >>> >>> Note that in today's PAPI layer we are already effectively exposing the >>> record context which has the issues that we have been discussing right >>> now, >>> and its semantics is always referring to the "processing record" at hand. >>> More specifically, we can think of processing a record a bit different: >>> >>> 1) the record traversed the topology from source to sink, it may be >>> transformed into new object or even generate multiple new objects (think: >>> branch) along the traversal. And the record context is referring to this >>> processing record. Here the "lifetime" of the record lasts for the entire >>> topology traversal and any new records of this traversal is treated as >>> different transformed values of this record (this applies to join and >>> aggregations as well). >>> >>> 2) the record being processed is wiped out in the first operator after >>> the >>> source, and NEW records are forwarded to downstream operators. I.e. each >>> record only lives between two adjacent operators, once it reached the new >>> operator it's lifetime has ended and new records are generated. >>> >>> I think in the past we have talked about Streams under both context, and >>> we >>> do not have a clear agreement. I agree that 2) is logically more >>> understandable for users as it does not leak any internal implementation >>> details (e.g. for stream-table joins, table record's traversal ends at >>> the >>> join operator as it is only be materialized, while stream record's >>> traversal goes through the join operator to further down until sinks). >>> However if we are going to interpret following 2) above then even for >>> non-stateful operators we would not inherit record context. What we're >>> discussing now, seems to infer a third semantics: >>> >>> 3) a record would traverse "through" one-to-one (non-stateful) operators, >>> will "replicate" at one-to-many (non-stateful) operators (think: >>> "mapValues" >>> ) and will "end" at many-to-one (stateful) operators where NEW records >>> will be generated and forwarded to the downstream operators. >>> >>> Just wanted to lay the ground for discussions so we are all on the same >>> page before chatting more. >>> >>> >>> Guozhang >>> >> >> >> On 11/6/17 1:41 PM, Jeyhun Karimov wrote: >> >>> Hi Matthias, >>> >>> Thanks a lot for correcting. It is a leftover from the past designs when >>> punctuate() was not deprecated. >>> I corrected. >>> >>> Cheers, >>> Jeyhun >>> >>> On Mon, Nov 6, 2017 at 5:30 PM Matthias J. Sax <matth...@confluent.io> >>> wrote: >>> >>> I just re-read the KIP. >>>> >>>> One minor comment: we don't need to introduce any deprecated methods. >>>> Thus, RichValueTransformer#punctuate can be removed completely instead >>>> of introducing it as deprecated. >>>> >>>> Otherwise looks good to me. >>>> >>>> Thanks for being so patient! >>>> >>>> >>>> -Matthias >>>> >>>> On 11/1/17 9:16 PM, Guozhang Wang wrote: >>>> >>>>> Jeyhun, >>>>> >>>>> I think I'm convinced to not do KAFKA-3907 in this KIP. We should think >>>>> carefully if we should add this functionality to the DSL layer moving >>>>> forward since from what we discovered working on it the conclusion is >>>>> >>>> that >>>> >>>>> it would require revamping the public APIs quite a lot, and it's not >>>>> >>>> clear >>>> >>>>> if it is a good trade-off than asking users to call process() instead. >>>>> >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> On Wed, Nov 1, 2017 at 4:50 AM, Damian Guy <damian....@gmail.com> >>>>> wrote: >>>>> >>>>> Hi Jeyhun, thanks, looks good. >>>>>> Do we need to remove the line that says: >>>>>> >>>>>> - on-demand commit() feature >>>>>> >>>>>> Cheers, >>>>>> Damian >>>>>> >>>>>> On Tue, 31 Oct 2017 at 23:07 Jeyhun Karimov <je.kari...@gmail.com> >>>>>> >>>>> wrote: >>>> >>>>> Hi, >>>>>>> >>>>>>> I removed the 'commit()' feature, as we discussed. It simplified the >>>>>>> overall design of KIP a lot. >>>>>>> If it is ok, I would like to start a VOTE thread. >>>>>>> >>>>>>> Cheers, >>>>>>> Jeyhun >>>>>>> >>>>>>> On Fri, Oct 27, 2017 at 5:28 PM Matthias J. Sax < >>>>>>> matth...@confluent.io >>>>>>> wrote: >>>>>>> >>>>>>> Thanks. I understand what you are saying, but I don't agree that >>>>>>>> >>>>>>>> but also we need a commit() method >>>>>>>>> >>>>>>>> I would just not provide `commit()` at DSL level and close the >>>>>>>> corresponding Jira as "not a problem" or similar. >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> On 10/27/17 3:42 PM, Jeyhun Karimov wrote: >>>>>>>> >>>>>>>>> Hi Matthias, >>>>>>>>> >>>>>>>>> Thanks for your comments. I agree that this is not the best way to >>>>>>>>> >>>>>>>> do. >>>>>> >>>>>>> A >>>>>>> >>>>>>>> bit of history behind this design. >>>>>>>>> >>>>>>>>> Prior doing this, I tried to provide ProcessorContext itself as an >>>>>>>>> >>>>>>>> argument >>>>>>>> >>>>>>>>> in Rich interfaces. However, we dont want to give users that >>>>>>>>> >>>>>>>> flexibility >>>>>>> >>>>>>>> and “power”. Moreover, ProcessorContext contains processor level >>>>>>>>> information and not Record level info. The only thing we need ij >>>>>>>>> ProcessorContext is commit() method. >>>>>>>>> >>>>>>>>> So, as far as I understood, we need recor context (offset, >>>>>>>>> timestamp >>>>>>>>> >>>>>>>> and >>>>>>> >>>>>>>> etc) but also we need a commit() method ( we dont want to provide >>>>>>>>> ProcessorContext as a parameter so users can use >>>>>>>>> >>>>>>>> ProcessorContext.commit() >>>>>>>> >>>>>>>>> ). >>>>>>>>> >>>>>>>>> As a result, I thought to “propagate” commit() call from >>>>>>>>> >>>>>>>> RecordContext >>>>>> >>>>>>> to >>>>>>> >>>>>>>> ProcessorContext() . >>>>>>>>> >>>>>>>>> >>>>>>>>> If there is a misunderstanding in motvation/discussion of >>>>>>>>> >>>>>>>> KIP/included >>>>>> >>>>>>> jiras please let me know. >>>>>>>>> >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Jeyhun >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri 27. Oct 2017 at 12:39, Matthias J. Sax < >>>>>>>>> matth...@confluent.io >>>>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I am personally still not convinced, that we should add `commit()` >>>>>>>>>> >>>>>>>>> at >>>>>> >>>>>>> all. >>>>>>>> >>>>>>>>> @Guozhang: you created the original Jira. Can you elaborate a >>>>>>>>>> little >>>>>>>>>> bit? Isn't requesting commits a low level API that should not be >>>>>>>>>> >>>>>>>>> exposed >>>>>>> >>>>>>>> in the DSL? Just want to understand the motivation better. Why would >>>>>>>>>> anybody that uses the DSL ever want to request a commit? To me, >>>>>>>>>> requesting commits is useful if you manipulated state explicitly, >>>>>>>>>> >>>>>>>>> ie, >>>>>> >>>>>>> via Processor API. >>>>>>>>>> >>>>>>>>>> Also, for the solution: it seem rather unnatural to me, that we >>>>>>>>>> add >>>>>>>>>> `commit()` to `RecordContext` -- from my understanding, >>>>>>>>>> >>>>>>>>> `RecordContext` >>>>>>> >>>>>>>> is an helper object that provide access to record meta data. >>>>>>>>>> >>>>>>>>> Requesting >>>>>>> >>>>>>>> a commit is something quite different. Additionally, a commit does >>>>>>>>>> >>>>>>>>> not >>>>>> >>>>>>> commit a specific record but a `RecrodContext` is for a specific >>>>>>>>>> >>>>>>>>> record. >>>>>>> >>>>>>>> To me, this does not seem to be a sound API design if we follow this >>>>>>>>>> >>>>>>>>> path. >>>>>>>> >>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 10/26/17 10:41 PM, Jeyhun Karimov wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> Thanks for your suggestions. >>>>>>>>>>> >>>>>>>>>>> I have some comments, to make sure that there is no >>>>>>>>>>> >>>>>>>>>> misunderstanding. >>>>>> >>>>>>> >>>>>>>>>>> 1. Maybe we can deprecate the `commit()` in ProcessorContext, to >>>>>>>>>>> >>>>>>>>>> enforce >>>>>>>> >>>>>>>>> user to consolidate this call as >>>>>>>>>>>> "processorContext.recordContext().commit()". And internal >>>>>>>>>>>> >>>>>>>>>>> implementation >>>>>>>> >>>>>>>>> of >>>>>>>>>>>> `ProcessorContext.commit()` in `ProcessorContextImpl` is also >>>>>>>>>>>> >>>>>>>>>>> changed >>>>>>> >>>>>>>> to >>>>>>>> >>>>>>>>> this call. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> - I think we should not deprecate `ProcessorContext.commit()`. >>>>>>>>>>> The >>>>>>>>>>> >>>>>>>>>> main >>>>>>> >>>>>>>> intuition that we introduce `commit()` in `RecordContext` is that, >>>>>>>>>>> `RecordContext` is the one which is provided in Rich interfaces. >>>>>>>>>>> So >>>>>>>>>>> >>>>>>>>>> if >>>>>>> >>>>>>>> user >>>>>>>>>> >>>>>>>>>>> wants to commit, then there should be some method inside >>>>>>>>>>> >>>>>>>>>> `RecordContext` >>>>>>>> >>>>>>>>> to >>>>>>>>>> >>>>>>>>>>> do so. Internally, `RecordContext.commit()` calls >>>>>>>>>>> `ProcessorContext.commit()` (see the last code snippet in >>>>>>>>>>> >>>>>>>>>> KIP-159): >>>>>> >>>>>>> @Override >>>>>>>>>>> public void process(final K1 key, final V1 value) { >>>>>>>>>>> >>>>>>>>>>> recordContext = new RecordContext() { // >>>>>>>>>>> recordContext initialization is added in this KIP >>>>>>>>>>> @Override >>>>>>>>>>> public void commit() { >>>>>>>>>>> context().commit(); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public long offset() { >>>>>>>>>>> return context().recordContext().offset(); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public long timestamp() { >>>>>>>>>>> return context().recordContext().timestamp(); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public String topic() { >>>>>>>>>>> return context().recordContext().topic(); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public int partition() { >>>>>>>>>>> return context().recordContext().partition(); >>>>>>>>>>> } >>>>>>>>>>> }; >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> So, we cannot deprecate `ProcessorContext.commit()` in this case >>>>>>>>>>> >>>>>>>>>> IMO. >>>>>> >>>>>>> >>>>>>>>>>> 2. Add the `task` reference to the impl class, >>>>>>>>>>> >>>>>>>>>> `ProcessorRecordContext`, >>>>>>>> >>>>>>>>> so >>>>>>>>>> >>>>>>>>>>> that it can implement the commit call itself. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> - Actually, I don't think that we need `commit()` in >>>>>>>>>>> `ProcessorRecordContext`. The main intuition is to "transfer" >>>>>>>>>>> `ProcessorContext.commit()` call to Rich interfaces, to support >>>>>>>>>>> user-specific committing. >>>>>>>>>>> To do so, we introduce `commit()` method in `RecordContext()` >>>>>>>>>>> just >>>>>>>>>>> >>>>>>>>>> only >>>>>>>> >>>>>>>>> to >>>>>>>>>> >>>>>>>>>>> call ProcessorContext.commit() inside. (see the above code >>>>>>>>>>> snippet) >>>>>>>>>>> So, in Rich interfaces, we are not dealing with >>>>>>>>>>> >>>>>>>>>> `ProcessorRecordContext` >>>>>>>> >>>>>>>>> at all, and we leave all its methods as it is. >>>>>>>>>>> In this KIP, we made `RecordContext` to be the parent class of >>>>>>>>>>> `ProcessorRecordContext`, just because of they share quite amount >>>>>>>>>>> >>>>>>>>>> of >>>>>> >>>>>>> methods and it is logical to enable inheritance between those two. >>>>>>>>>>> >>>>>>>>>>> 3. In the wiki page, the statement that "However, call to a >>>>>>>>>>> >>>>>>>>>> commit() >>>>>> >>>>>>> method, >>>>>>>>>> >>>>>>>>>>> is valid only within RecordContext interface (at least for now), >>>>>>>>>>>> >>>>>>>>>>> we >>>>>> >>>>>>> throw >>>>>>>>>> >>>>>>>>>>> an exception in ProcessorRecordContext.commit()." and the code >>>>>>>>>>>> >>>>>>>>>>> snippet >>>>>>> >>>>>>>> below would need to be updated as well. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> - I think above explanation covers this as well. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I want to gain some speed to this KIP, as it has gone though many >>>>>>>>>>> >>>>>>>>>> changes >>>>>>>> >>>>>>>>> based on user/developer needs, both in >>>>>>>>>>> >>>>>>>>>> documentation-/implementation-wise. >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Jeyhun >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Oct 24, 2017 at 1:41 AM Guozhang Wang < >>>>>>>>>>> wangg...@gmail.com> >>>>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks for the information Jeyhun. I had also forgot about >>>>>>>>>>>> >>>>>>>>>>> KAFKA-3907 >>>>>>> >>>>>>>> with >>>>>>>>>> >>>>>>>>>>> this KIP.. >>>>>>>>>>>> >>>>>>>>>>>> Thinking a bit more, I'm now inclined to go with what we agreed >>>>>>>>>>>> >>>>>>>>>>> before, >>>>>>>> >>>>>>>>> to >>>>>>>>>> >>>>>>>>>>> add the commit() call to `RecordContext`. A few minor tweaks on >>>>>>>>>>>> >>>>>>>>>>> its >>>>>> >>>>>>> implementation: >>>>>>>>>>>> >>>>>>>>>>>> 1. Maybe we can deprecate the `commit()` in ProcessorContext, to >>>>>>>>>>>> >>>>>>>>>>> enforce >>>>>>>> >>>>>>>>> user to consolidate this call as >>>>>>>>>>>> "processorContext.recordContext().commit()". And internal >>>>>>>>>>>> >>>>>>>>>>> implementation >>>>>>>> >>>>>>>>> of >>>>>>>>>>>> `ProcessorContext.commit()` in `ProcessorContextImpl` is also >>>>>>>>>>>> >>>>>>>>>>> changed >>>>>>> >>>>>>>> to >>>>>>>> >>>>>>>>> this call. >>>>>>>>>>>> >>>>>>>>>>>> 2. Add the `task` reference to the impl class, >>>>>>>>>>>> >>>>>>>>>>> `ProcessorRecordContext`, so >>>>>>>>>> >>>>>>>>>>> that it can implement the commit call itself. >>>>>>>>>>>> >>>>>>>>>>>> 3. In the wiki page, the statement that "However, call to a >>>>>>>>>>>> >>>>>>>>>>> commit() >>>>>> >>>>>>> method, >>>>>>>>>>>> is valid only within RecordContext interface (at least for now), >>>>>>>>>>>> >>>>>>>>>>> we >>>>>> >>>>>>> throw >>>>>>>>>> >>>>>>>>>>> an exception in ProcessorRecordContext.commit()." and the code >>>>>>>>>>>> >>>>>>>>>>> snippet >>>>>>> >>>>>>>> below would need to be updated as well. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Guozhang >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Oct 23, 2017 at 1:40 PM, Matthias J. Sax < >>>>>>>>>>>> >>>>>>>>>>> matth...@confluent.io >>>>>>>> >>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Fair point. This is a long discussion and I totally forgot that >>>>>>>>>>>>> >>>>>>>>>>>> we >>>>>> >>>>>>> discussed this. >>>>>>>>>>>>> >>>>>>>>>>>>> Seems I changed my opinion about including KAFKA-3907... >>>>>>>>>>>>> >>>>>>>>>>>>> Happy to hear what others think. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -Matthias >>>>>>>>>>>>> >>>>>>>>>>>>> On 10/23/17 1:20 PM, Jeyhun Karimov wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>> >>>>>>>>>>>>>> It is probably my bad, the discussion was a bit long in this >>>>>>>>>>>>>> >>>>>>>>>>>>> thread. I >>>>>>>> >>>>>>>>> proposed the related issue in the related KIP discuss thread [1] >>>>>>>>>>>>>> >>>>>>>>>>>>> and >>>>>>> >>>>>>>> got >>>>>>>>>>>> >>>>>>>>>>>>> an >>>>>>>>>>>>> >>>>>>>>>>>>>> approval [2,3]. >>>>>>>>>>>>>> Maybe I misunderstood. >>>>>>>>>>>>>> >>>>>>>>>>>>>> [1] >>>>>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND19Asmg1GKKXT1?subj= >>>>>>>>>>>>>> >>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams >>>>>>>>>>>>> >>>>>>>>>>>>>> [2] >>>>>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1kpct22GKKXT1?subj= >>>>>>>>>>>>>> >>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams >>>>>>>>>>>>> >>>>>>>>>>>>>> [3] >>>>>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1G6TGIGKKXT1?subj= >>>>>>>>>>>>>> >>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Oct 23, 2017 at 8:44 PM Matthias J. Sax < >>>>>>>>>>>>>> >>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Interesting. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I thought that https://issues.apache.org/ >>>>>>>>>>>>>>> >>>>>>>>>>>>>> jira/browse/KAFKA-4125 >>>>>> >>>>>>> is >>>>>>> >>>>>>>> the >>>>>>>>>>>> >>>>>>>>>>>>> main motivation for this KIP :) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I also think, that we should not expose the full >>>>>>>>>>>>>>> >>>>>>>>>>>>>> ProcessorContext >>>>>> >>>>>>> at >>>>>>>> >>>>>>>>> DSL >>>>>>>>>>>> >>>>>>>>>>>>> level. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thus, overall I am not even sure if we should fix KAFKA-3907 >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> >>>>>>>>>>>>>> all. >>>>>>>> >>>>>>>>> Manual commits are something DSL users should not worry about >>>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>> >>>>>>> and >>>>>>>> >>>>>>>>> if >>>>>>>>>>>> >>>>>>>>>>>>> one really needs this, an advanced user can still insert a >>>>>>>>>>>>>>> >>>>>>>>>>>>>> dummy >>>>>> >>>>>>> `transform` to request a commit from there. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 10/18/17 5:39 AM, Jeyhun Karimov wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The main intuition is to solve [1], which is part of this >>>>>>>>>>>>>>>> KIP. >>>>>>>>>>>>>>>> I agree with you that this might not seem semantically >>>>>>>>>>>>>>>> correct >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> as >>>>>>> >>>>>>>> we >>>>>>>> >>>>>>>>> are >>>>>>>>>>>>> >>>>>>>>>>>>>> not committing record state. >>>>>>>>>>>>>>>> Alternatively, we can remove commit() from RecordContext and >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> add >>>>>> >>>>>>> ProcessorContext (which has commit() method) as an extra >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> argument >>>>>>> >>>>>>>> to >>>>>>>> >>>>>>>>> Rich >>>>>>>>>>>>> >>>>>>>>>>>>>> methods: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> instead of >>>>>>>>>>>>>>>> public interface RichValueMapper<V, VR, K> { >>>>>>>>>>>>>>>> VR apply(final V value, >>>>>>>>>>>>>>>> final K key, >>>>>>>>>>>>>>>> final RecordContext recordContext); >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> we can adopt >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> public interface RichValueMapper<V, VR, K> { >>>>>>>>>>>>>>>> VR apply(final V value, >>>>>>>>>>>>>>>> final K key, >>>>>>>>>>>>>>>> final RecordContext recordContext, >>>>>>>>>>>>>>>> final ProcessorContext processorContext); >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> However, in this case, a user can get confused as >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ProcessorContext >>>>>>> >>>>>>>> and >>>>>>>>>>>> >>>>>>>>>>>>> RecordContext share some methods with the same name. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/KAFKA-3907 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, Oct 17, 2017 at 3:19 AM Guozhang Wang < >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> wangg...@gmail.com >>>>>>> >>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regarding #6 above, I'm still not clear why we would need >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> `commit()` >>>>>>>>>> >>>>>>>>>>> in >>>>>>>>>>>>> >>>>>>>>>>>>>> both ProcessorContext and RecordContext, could you elaborate >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> a >>>>>> >>>>>>> bit >>>>>>>> >>>>>>>>> more? >>>>>>>>>>>>> >>>>>>>>>>>>>> To me `commit()` is really a processor context not a record >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> context >>>>>>>> >>>>>>>>> logically: when you call that function, it means we would >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> commit >>>>>>> >>>>>>>> the >>>>>>>>>> >>>>>>>>>>> state >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> of the whole task up to this processed record, not only that >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> single >>>>>>>> >>>>>>>>> record >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> itself. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Mon, Oct 16, 2017 at 9:19 AM, Jeyhun Karimov < >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> je.kari...@gmail.com >>>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for the feedback. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 0. RichInitializer definition seems missing. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> - Fixed. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I'd suggest moving the key parameter in the RichValueXX >>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> RichReducer >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> after the value parameters, as well as in the templates; >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> e.g. >>>>>> >>>>>>> public interface RichValueJoiner<V1, V2, VR, K> { >>>>>>>>>>>>>>>>>>> VR apply(final V1 value1, final V2 value2, final K >>>>>>>>>>>>>>>>>>> key, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> final >>>>>>>> >>>>>>>>> RecordContext >>>>>>>>>>>>>>>>>>> recordContext); >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> - Fixed. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2. Some of the listed functions are not necessary since >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> their >>>>>> >>>>>>> pairing >>>>>>>>>>>> >>>>>>>>>>>>> APIs >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> are being deprecated in 1.0 already: >>>>>>>>>>>>>>>>>>> <KR> KGroupedStream<KR, V> groupBy(final >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> RichKeyValueMapper<? >>>>>> >>>>>>> super >>>>>>>>>>>> >>>>>>>>>>>>> K, >>>>>>>>>>>>> >>>>>>>>>>>>>> ? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> super V, KR> selector, >>>>>>>>>>>>>>>>>>> final Serde<KR> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> keySerde, >>>>>> >>>>>>> final Serde<V> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> valSerde); >>>>>> >>>>>>> <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, >>>>>>>>>>>>>>>>>>> final RichValueJoiner<? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> super >>>>>>> >>>>>>>> K, >>>>>>>> >>>>>>>>> ? >>>>>>>>>>>> >>>>>>>>>>>>> super >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> V, >>>>>>>>>>>>>>>>>>> ? super VT, ? extends VR> joiner, >>>>>>>>>>>>>>>>>>> final Serde<K> >>>>>>>>>>>>>>>>>>> keySerde, >>>>>>>>>>>>>>>>>>> final Serde<V> >>>>>>>>>>>>>>>>>>> valSerde); >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -Fixed >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 3. For a few functions where we are adding three APIs for >>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> combo >>>>>>>> >>>>>>>>> of >>>>>>>>>>>> >>>>>>>>>>>>> both >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> mapper / joiner, or both initializer / aggregator, or >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> adder / >>>>>> >>>>>>> subtractor, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I'm wondering if we can just keep one that use "rich" >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> functions >>>>>>> >>>>>>>> for >>>>>>>>>>>> >>>>>>>>>>>>> both; >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> so that we can have less overloads and let users who only >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> want >>>>>>> >>>>>>>> to >>>>>>>> >>>>>>>>> access >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> one of them to just use dummy parameter declarations. For >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> example: >>>>>>>>>> >>>>>>>>>>> <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> globalKTable, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> final >>>>>>>>>>>>>>>>>>> RichKeyValueMapper<? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> super >>>>>>>> >>>>>>>>> K, ? >>>>>>>>>>>>> >>>>>>>>>>>>>> super >>>>>>>>>>>>>>>>>>> V, ? extends GK> keyValueMapper, >>>>>>>>>>>>>>>>>>> final RichValueJoiner<? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> super >>>>>>> >>>>>>>> K, >>>>>>>> >>>>>>>>> ? >>>>>>>>>>>> >>>>>>>>>>>>> super >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> V, >>>>>>>>>>>>>>>>>>> ? super GV, ? extends RV> joiner); >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -Agreed. Fixed. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 4. For TimeWindowedKStream, I'm wondering why we do not >>>>>>>>>>>>>>>>>> make >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> its >>>>>>> >>>>>>>> Initializer also "rich" functions? I.e. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> - It was a typo. Fixed. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 5. We need to move "RecordContext" from >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> o.a.k.processor.internals >>>>>>>> >>>>>>>>> to >>>>>>>>>>>> >>>>>>>>>>>>> o.a.k.processor. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 6. I'm not clear why we want to move `commit()` from >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> ProcessorContext >>>>>>>>>>>>> >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> RecordContext? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - >>>>>>>>>>>>>>>>>> Because it makes sense logically and to reduce code >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> maintenance >>>>>>> >>>>>>>> (both >>>>>>>>>>>>> >>>>>>>>>>>>>> interfaces have offset() timestamp() topic() partition() >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> methods), I >>>>>>>>>>>> >>>>>>>>>>>>> inherit ProcessorContext from RecordContext. >>>>>>>>>>>>>>>>>> Since we need commit() method both in ProcessorContext and >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> in >>>>>> >>>>>>> RecordContext >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I move commit() method to parent class (RecordContext). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Wed, Oct 11, 2017 at 12:59 AM, Guozhang Wang < >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wangg...@gmail.com> >>>>>>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Jeyhun, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks for the updated KIP, here are my comments. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 0. RichInitializer definition seems missing. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1. I'd suggest moving the key parameter in the >>>>>>>>>>>>>>>>>>> RichValueXX >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> and >>>>>>> >>>>>>>> RichReducer >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> after the value parameters, as well as in the templates; >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> e.g. >>>>>> >>>>>>> public interface RichValueJoiner<V1, V2, VR, K> { >>>>>>>>>>>>>>>>>>> VR apply(final V1 value1, final V2 value2, final K >>>>>>>>>>>>>>>>>>> key, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> final >>>>>>>> >>>>>>>>> RecordContext >>>>>>>>>>>>>>>>>>> recordContext); >>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> My motivation is that for lambda expression in J8, users >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> that >>>>>> >>>>>>> would >>>>>>>>>>>> >>>>>>>>>>>>> not >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> care about the key but only the context, or vice versa, is >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> likely >>>>>>>> >>>>>>>>> to >>>>>>>>>>>> >>>>>>>>>>>>> write >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> it as (value1, value2, dummy, context) -> ... than >>>>>>>>>>>>>>>>>>> putting >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> the >>>>>>> >>>>>>>> dummy >>>>>>>>>>>> >>>>>>>>>>>>> at >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> beginning of the parameter list. Generally speaking we'd >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> like >>>>>> >>>>>>> to >>>>>>>> >>>>>>>>> make >>>>>>>>>>>>> >>>>>>>>>>>>>> all >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- -- Guozhang