Jan,

Not sure I understand your argument that "we still going to present
change.oldValue to the filter even though the record context() is for
change.newValue". Are you referring to `KTableFilter#process()`? If yes
could you point to me which LOC are you concerning about?


Guozhang


On Mon, Nov 20, 2017 at 9:29 PM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

> a remark of mine that got missed during migration:
>
> There is this problem that even though we have source.table.filter.join
> the state-fullness happens at the table step not a the join step. In a
> filter
> we still going to present change.oldValue to the filter even though the
> record context() is for change.newValue. I would go as far as applying
> the filter before the table processor. Not to just get KIP-159, but because
> I think its a side effect of a non ideal topology layout. If i can filter
> 99% of my
> records. my state could be way smaller. Also widely escalates the context
> of the KIP
>
> I can only see upsides of executing the filter first.
>
> Best Jan
>
>
>
> On 20.11.2017 22:22, Matthias J. Sax wrote:
>
>> I am moving this back to the DISCUSS thread... Last 10 emails were sent
>> to VOTE thread.
>>
>> Copying Guozhang's last summary below. Thanks for this summary. Very
>> comprehensive!
>>
>> It seems, we all agree, that the current implementation of the context
>> at PAPI level is ok, but we should not leak it into DSL.
>>
>> Thus, we can go with (2) or (3), were (3) is an extension to (2)
>> carrying the context to more operators than just sources. It also seems,
>> that we all agree, that many-to-one operations void the context.
>>
>> I still think, that just going with plain (2) is too restrictive -- but
>> I am also fine if we don't go with the full proposal of (3).
>>
>> Also note, that the two operators filter() and filterNot() don't modify
>> the record and thus for both, it would be absolutely valid to keep the
>> context.
>>
>> I personally would keep the context for at least all one-to-one
>> operators. One-to-many is debatable and I am fine to not carry the
>> context further: at least the offset information is questionable for
>> this case -- note thought, that semantically, the timestamp is inherited
>> via one-to-many, and I also think this applies to "topic" and
>> "partition". Thus, I think it's still valuable information we can carry
>> downstreams.
>>
>>
>> -Matthias
>>
>> Jan: which approach are you referring to as "the approach that is on the
>>> table would be perfect"?
>>>
>>> Note that in today's PAPI layer we are already effectively exposing the
>>> record context which has the issues that we have been discussing right
>>> now,
>>> and its semantics is always referring to the "processing record" at hand.
>>> More specifically, we can think of processing a record a bit different:
>>>
>>> 1) the record traversed the topology from source to sink, it may be
>>> transformed into new object or even generate multiple new objects (think:
>>> branch) along the traversal. And the record context is referring to this
>>> processing record. Here the "lifetime" of the record lasts for the entire
>>> topology traversal and any new records of this traversal is treated as
>>> different transformed values of this record (this applies to join and
>>> aggregations as well).
>>>
>>> 2) the record being processed is wiped out in the first operator after
>>> the
>>> source, and NEW records are forwarded to downstream operators. I.e. each
>>> record only lives between two adjacent operators, once it reached the new
>>> operator it's lifetime has ended and new records are generated.
>>>
>>> I think in the past we have talked about Streams under both context, and
>>> we
>>> do not have a clear agreement. I agree that 2) is logically more
>>> understandable for users as it does not leak any internal implementation
>>> details (e.g. for stream-table joins, table record's traversal ends at
>>> the
>>> join operator as it is only be materialized, while stream record's
>>> traversal goes through the join operator to further down until sinks).
>>> However if we are going to interpret following 2) above then even for
>>> non-stateful operators we would not inherit record context. What we're
>>> discussing now, seems to infer a third semantics:
>>>
>>> 3) a record would traverse "through" one-to-one (non-stateful) operators,
>>> will "replicate" at one-to-many (non-stateful) operators (think:
>>> "mapValues"
>>>   ) and will "end" at many-to-one (stateful) operators where NEW records
>>> will be generated and forwarded to the downstream operators.
>>>
>>> Just wanted to lay the ground for discussions so we are all on the same
>>> page before chatting more.
>>>
>>>
>>> Guozhang
>>>
>>
>>
>> On 11/6/17 1:41 PM, Jeyhun Karimov wrote:
>>
>>> Hi Matthias,
>>>
>>> Thanks a lot for correcting. It is a leftover from the past designs when
>>> punctuate() was not deprecated.
>>> I corrected.
>>>
>>> Cheers,
>>> Jeyhun
>>>
>>> On Mon, Nov 6, 2017 at 5:30 PM Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>
>>> I just re-read the KIP.
>>>>
>>>> One minor comment: we don't need to introduce any deprecated methods.
>>>> Thus, RichValueTransformer#punctuate can be removed completely instead
>>>> of introducing it as deprecated.
>>>>
>>>> Otherwise looks good to me.
>>>>
>>>> Thanks for being so patient!
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 11/1/17 9:16 PM, Guozhang Wang wrote:
>>>>
>>>>> Jeyhun,
>>>>>
>>>>> I think I'm convinced to not do KAFKA-3907 in this KIP. We should think
>>>>> carefully if we should add this functionality to the DSL layer moving
>>>>> forward since from what we discovered working on it the conclusion is
>>>>>
>>>> that
>>>>
>>>>> it would require revamping the public APIs quite a lot, and it's not
>>>>>
>>>> clear
>>>>
>>>>> if it is a good trade-off than asking users to call process() instead.
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Wed, Nov 1, 2017 at 4:50 AM, Damian Guy <damian....@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hi Jeyhun, thanks, looks good.
>>>>>> Do we need to remove the line that says:
>>>>>>
>>>>>>     - on-demand commit() feature
>>>>>>
>>>>>> Cheers,
>>>>>> Damian
>>>>>>
>>>>>> On Tue, 31 Oct 2017 at 23:07 Jeyhun Karimov <je.kari...@gmail.com>
>>>>>>
>>>>> wrote:
>>>>
>>>>> Hi,
>>>>>>>
>>>>>>> I removed the 'commit()' feature, as we discussed. It simplified  the
>>>>>>> overall design of KIP a lot.
>>>>>>> If it is ok, I would like to start a VOTE thread.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Jeyhun
>>>>>>>
>>>>>>> On Fri, Oct 27, 2017 at 5:28 PM Matthias J. Sax <
>>>>>>> matth...@confluent.io
>>>>>>> wrote:
>>>>>>>
>>>>>>> Thanks. I understand what you are saying, but I don't agree that
>>>>>>>>
>>>>>>>> but also we need a commit() method
>>>>>>>>>
>>>>>>>> I would just not provide `commit()` at DSL level and close the
>>>>>>>> corresponding Jira as "not a problem" or similar.
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 10/27/17 3:42 PM, Jeyhun Karimov wrote:
>>>>>>>>
>>>>>>>>> Hi Matthias,
>>>>>>>>>
>>>>>>>>> Thanks for your comments. I agree that this is not the best way to
>>>>>>>>>
>>>>>>>> do.
>>>>>>
>>>>>>> A
>>>>>>>
>>>>>>>> bit of history behind this design.
>>>>>>>>>
>>>>>>>>> Prior doing this, I tried to provide ProcessorContext itself as an
>>>>>>>>>
>>>>>>>> argument
>>>>>>>>
>>>>>>>>> in Rich interfaces. However, we dont want to give users that
>>>>>>>>>
>>>>>>>> flexibility
>>>>>>>
>>>>>>>> and “power”. Moreover, ProcessorContext contains processor level
>>>>>>>>> information and not Record level info. The only thing we need ij
>>>>>>>>> ProcessorContext is commit() method.
>>>>>>>>>
>>>>>>>>> So, as far as I understood, we need recor context (offset,
>>>>>>>>> timestamp
>>>>>>>>>
>>>>>>>> and
>>>>>>>
>>>>>>>> etc) but also we need a commit() method ( we dont want to provide
>>>>>>>>> ProcessorContext as a parameter so users can use
>>>>>>>>>
>>>>>>>> ProcessorContext.commit()
>>>>>>>>
>>>>>>>>> ).
>>>>>>>>>
>>>>>>>>> As a result, I thought to “propagate” commit() call from
>>>>>>>>>
>>>>>>>> RecordContext
>>>>>>
>>>>>>> to
>>>>>>>
>>>>>>>> ProcessorContext() .
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> If there is a misunderstanding in motvation/discussion of
>>>>>>>>>
>>>>>>>> KIP/included
>>>>>>
>>>>>>> jiras please let me know.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Jeyhun
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri 27. Oct 2017 at 12:39, Matthias J. Sax <
>>>>>>>>> matth...@confluent.io
>>>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I am personally still not convinced, that we should add `commit()`
>>>>>>>>>>
>>>>>>>>> at
>>>>>>
>>>>>>> all.
>>>>>>>>
>>>>>>>>> @Guozhang: you created the original Jira. Can you elaborate a
>>>>>>>>>> little
>>>>>>>>>> bit? Isn't requesting commits a low level API that should not be
>>>>>>>>>>
>>>>>>>>> exposed
>>>>>>>
>>>>>>>> in the DSL? Just want to understand the motivation better. Why would
>>>>>>>>>> anybody that uses the DSL ever want to request a commit? To me,
>>>>>>>>>> requesting commits is useful if you manipulated state explicitly,
>>>>>>>>>>
>>>>>>>>> ie,
>>>>>>
>>>>>>> via Processor API.
>>>>>>>>>>
>>>>>>>>>> Also, for the solution: it seem rather unnatural to me, that we
>>>>>>>>>> add
>>>>>>>>>> `commit()` to `RecordContext` -- from my understanding,
>>>>>>>>>>
>>>>>>>>> `RecordContext`
>>>>>>>
>>>>>>>> is an helper object that provide access to record meta data.
>>>>>>>>>>
>>>>>>>>> Requesting
>>>>>>>
>>>>>>>> a commit is something quite different. Additionally, a commit does
>>>>>>>>>>
>>>>>>>>> not
>>>>>>
>>>>>>> commit a specific record but a `RecrodContext` is for a specific
>>>>>>>>>>
>>>>>>>>> record.
>>>>>>>
>>>>>>>> To me, this does not seem to be a sound API design if we follow this
>>>>>>>>>>
>>>>>>>>> path.
>>>>>>>>
>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 10/26/17 10:41 PM, Jeyhun Karimov wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for your suggestions.
>>>>>>>>>>>
>>>>>>>>>>> I have some comments, to make sure that there is no
>>>>>>>>>>>
>>>>>>>>>> misunderstanding.
>>>>>>
>>>>>>>
>>>>>>>>>>> 1. Maybe we can deprecate the `commit()` in ProcessorContext, to
>>>>>>>>>>>
>>>>>>>>>> enforce
>>>>>>>>
>>>>>>>>> user to consolidate this call as
>>>>>>>>>>>> "processorContext.recordContext().commit()". And internal
>>>>>>>>>>>>
>>>>>>>>>>> implementation
>>>>>>>>
>>>>>>>>> of
>>>>>>>>>>>> `ProcessorContext.commit()` in `ProcessorContextImpl` is also
>>>>>>>>>>>>
>>>>>>>>>>> changed
>>>>>>>
>>>>>>>> to
>>>>>>>>
>>>>>>>>> this call.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - I think we should not deprecate `ProcessorContext.commit()`.
>>>>>>>>>>> The
>>>>>>>>>>>
>>>>>>>>>> main
>>>>>>>
>>>>>>>> intuition that we introduce `commit()` in `RecordContext` is that,
>>>>>>>>>>> `RecordContext` is the one which is provided in Rich interfaces.
>>>>>>>>>>> So
>>>>>>>>>>>
>>>>>>>>>> if
>>>>>>>
>>>>>>>> user
>>>>>>>>>>
>>>>>>>>>>> wants to commit, then there should be some method inside
>>>>>>>>>>>
>>>>>>>>>> `RecordContext`
>>>>>>>>
>>>>>>>>> to
>>>>>>>>>>
>>>>>>>>>>> do so. Internally, `RecordContext.commit()` calls
>>>>>>>>>>> `ProcessorContext.commit()`  (see the last code snippet in
>>>>>>>>>>>
>>>>>>>>>> KIP-159):
>>>>>>
>>>>>>> @Override
>>>>>>>>>>>      public void process(final K1 key, final V1 value) {
>>>>>>>>>>>
>>>>>>>>>>>          recordContext = new RecordContext() {               //
>>>>>>>>>>> recordContext initialization is added in this KIP
>>>>>>>>>>>              @Override
>>>>>>>>>>>              public void commit() {
>>>>>>>>>>>                  context().commit();
>>>>>>>>>>>              }
>>>>>>>>>>>
>>>>>>>>>>>              @Override
>>>>>>>>>>>              public long offset() {
>>>>>>>>>>>                  return context().recordContext().offset();
>>>>>>>>>>>              }
>>>>>>>>>>>
>>>>>>>>>>>              @Override
>>>>>>>>>>>              public long timestamp() {
>>>>>>>>>>>                  return context().recordContext().timestamp();
>>>>>>>>>>>              }
>>>>>>>>>>>
>>>>>>>>>>>              @Override
>>>>>>>>>>>              public String topic() {
>>>>>>>>>>>                  return context().recordContext().topic();
>>>>>>>>>>>              }
>>>>>>>>>>>
>>>>>>>>>>>              @Override
>>>>>>>>>>>              public int partition() {
>>>>>>>>>>>                  return context().recordContext().partition();
>>>>>>>>>>>              }
>>>>>>>>>>>        };
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> So, we cannot deprecate `ProcessorContext.commit()` in this case
>>>>>>>>>>>
>>>>>>>>>> IMO.
>>>>>>
>>>>>>>
>>>>>>>>>>> 2. Add the `task` reference to the impl class,
>>>>>>>>>>>
>>>>>>>>>> `ProcessorRecordContext`,
>>>>>>>>
>>>>>>>>> so
>>>>>>>>>>
>>>>>>>>>>> that it can implement the commit call itself.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - Actually, I don't think that we need `commit()` in
>>>>>>>>>>> `ProcessorRecordContext`. The main intuition is to "transfer"
>>>>>>>>>>> `ProcessorContext.commit()` call to Rich interfaces, to support
>>>>>>>>>>> user-specific committing.
>>>>>>>>>>>   To do so, we introduce `commit()` method in `RecordContext()`
>>>>>>>>>>> just
>>>>>>>>>>>
>>>>>>>>>> only
>>>>>>>>
>>>>>>>>> to
>>>>>>>>>>
>>>>>>>>>>> call ProcessorContext.commit() inside. (see the above code
>>>>>>>>>>> snippet)
>>>>>>>>>>> So, in Rich interfaces, we are not dealing with
>>>>>>>>>>>
>>>>>>>>>> `ProcessorRecordContext`
>>>>>>>>
>>>>>>>>> at all, and we leave all its methods as it is.
>>>>>>>>>>> In this KIP, we made `RecordContext` to be the parent class of
>>>>>>>>>>> `ProcessorRecordContext`, just because of they share quite amount
>>>>>>>>>>>
>>>>>>>>>> of
>>>>>>
>>>>>>> methods and it is logical to enable inheritance between those two.
>>>>>>>>>>>
>>>>>>>>>>> 3. In the wiki page, the statement that "However, call to a
>>>>>>>>>>>
>>>>>>>>>> commit()
>>>>>>
>>>>>>> method,
>>>>>>>>>>
>>>>>>>>>>> is valid only within RecordContext interface (at least for now),
>>>>>>>>>>>>
>>>>>>>>>>> we
>>>>>>
>>>>>>> throw
>>>>>>>>>>
>>>>>>>>>>> an exception in ProcessorRecordContext.commit()." and the code
>>>>>>>>>>>>
>>>>>>>>>>> snippet
>>>>>>>
>>>>>>>> below would need to be updated as well.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - I think above explanation covers this as well.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I want to gain some speed to this KIP, as it has gone though many
>>>>>>>>>>>
>>>>>>>>>> changes
>>>>>>>>
>>>>>>>>> based on user/developer needs, both in
>>>>>>>>>>>
>>>>>>>>>> documentation-/implementation-wise.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Jeyhun
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Oct 24, 2017 at 1:41 AM Guozhang Wang <
>>>>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks for the information Jeyhun. I had also forgot about
>>>>>>>>>>>>
>>>>>>>>>>> KAFKA-3907
>>>>>>>
>>>>>>>> with
>>>>>>>>>>
>>>>>>>>>>> this KIP..
>>>>>>>>>>>>
>>>>>>>>>>>> Thinking a bit more, I'm now inclined to go with what we agreed
>>>>>>>>>>>>
>>>>>>>>>>> before,
>>>>>>>>
>>>>>>>>> to
>>>>>>>>>>
>>>>>>>>>>> add the commit() call to `RecordContext`. A few minor tweaks on
>>>>>>>>>>>>
>>>>>>>>>>> its
>>>>>>
>>>>>>> implementation:
>>>>>>>>>>>>
>>>>>>>>>>>> 1. Maybe we can deprecate the `commit()` in ProcessorContext, to
>>>>>>>>>>>>
>>>>>>>>>>> enforce
>>>>>>>>
>>>>>>>>> user to consolidate this call as
>>>>>>>>>>>> "processorContext.recordContext().commit()". And internal
>>>>>>>>>>>>
>>>>>>>>>>> implementation
>>>>>>>>
>>>>>>>>> of
>>>>>>>>>>>> `ProcessorContext.commit()` in `ProcessorContextImpl` is also
>>>>>>>>>>>>
>>>>>>>>>>> changed
>>>>>>>
>>>>>>>> to
>>>>>>>>
>>>>>>>>> this call.
>>>>>>>>>>>>
>>>>>>>>>>>> 2. Add the `task` reference to the impl class,
>>>>>>>>>>>>
>>>>>>>>>>> `ProcessorRecordContext`, so
>>>>>>>>>>
>>>>>>>>>>> that it can implement the commit call itself.
>>>>>>>>>>>>
>>>>>>>>>>>> 3. In the wiki page, the statement that "However, call to a
>>>>>>>>>>>>
>>>>>>>>>>> commit()
>>>>>>
>>>>>>> method,
>>>>>>>>>>>> is valid only within RecordContext interface (at least for now),
>>>>>>>>>>>>
>>>>>>>>>>> we
>>>>>>
>>>>>>> throw
>>>>>>>>>>
>>>>>>>>>>> an exception in ProcessorRecordContext.commit()." and the code
>>>>>>>>>>>>
>>>>>>>>>>> snippet
>>>>>>>
>>>>>>>> below would need to be updated as well.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Oct 23, 2017 at 1:40 PM, Matthias J. Sax <
>>>>>>>>>>>>
>>>>>>>>>>> matth...@confluent.io
>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Fair point. This is a long discussion and I totally forgot that
>>>>>>>>>>>>>
>>>>>>>>>>>> we
>>>>>>
>>>>>>> discussed this.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Seems I changed my opinion about including KAFKA-3907...
>>>>>>>>>>>>>
>>>>>>>>>>>>> Happy to hear what others think.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 10/23/17 1:20 PM, Jeyhun Karimov wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It is probably my bad, the discussion was a bit long in this
>>>>>>>>>>>>>>
>>>>>>>>>>>>> thread. I
>>>>>>>>
>>>>>>>>> proposed the related issue in the related KIP discuss thread [1]
>>>>>>>>>>>>>>
>>>>>>>>>>>>> and
>>>>>>>
>>>>>>>> got
>>>>>>>>>>>>
>>>>>>>>>>>>> an
>>>>>>>>>>>>>
>>>>>>>>>>>>>> approval [2,3].
>>>>>>>>>>>>>> Maybe I misunderstood.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND19Asmg1GKKXT1?subj=
>>>>>>>>>>>>>>
>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams
>>>>>>>>>>>>>
>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1kpct22GKKXT1?subj=
>>>>>>>>>>>>>>
>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams
>>>>>>>>>>>>>
>>>>>>>>>>>>>> [3]
>>>>>>>>>>>>>> http://search-hadoop.com/m/Kafka/uyzND1G6TGIGKKXT1?subj=
>>>>>>>>>>>>>>
>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+Streams
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Oct 23, 2017 at 8:44 PM Matthias J. Sax <
>>>>>>>>>>>>>>
>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Interesting.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I thought that https://issues.apache.org/
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> jira/browse/KAFKA-4125
>>>>>>
>>>>>>> is
>>>>>>>
>>>>>>>> the
>>>>>>>>>>>>
>>>>>>>>>>>>> main motivation for this KIP :)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I also think, that we should not expose the full
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ProcessorContext
>>>>>>
>>>>>>> at
>>>>>>>>
>>>>>>>>> DSL
>>>>>>>>>>>>
>>>>>>>>>>>>> level.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thus, overall I am not even sure if we should fix KAFKA-3907
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> all.
>>>>>>>>
>>>>>>>>> Manual commits are something DSL users should not worry about
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>
>>>>>>> and
>>>>>>>>
>>>>>>>>> if
>>>>>>>>>>>>
>>>>>>>>>>>>> one really needs this, an advanced user can still insert a
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> dummy
>>>>>>
>>>>>>> `transform` to request a commit from there.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 10/18/17 5:39 AM, Jeyhun Karimov wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The main intuition is to solve [1], which is part of this
>>>>>>>>>>>>>>>> KIP.
>>>>>>>>>>>>>>>> I agree with you that this might not seem semantically
>>>>>>>>>>>>>>>> correct
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> as
>>>>>>>
>>>>>>>> we
>>>>>>>>
>>>>>>>>> are
>>>>>>>>>>>>>
>>>>>>>>>>>>>> not committing record state.
>>>>>>>>>>>>>>>> Alternatively, we can remove commit() from RecordContext and
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> add
>>>>>>
>>>>>>> ProcessorContext (which has commit() method) as an extra
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> argument
>>>>>>>
>>>>>>>> to
>>>>>>>>
>>>>>>>>> Rich
>>>>>>>>>>>>>
>>>>>>>>>>>>>> methods:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> instead of
>>>>>>>>>>>>>>>> public interface RichValueMapper<V, VR, K> {
>>>>>>>>>>>>>>>>      VR apply(final V value,
>>>>>>>>>>>>>>>>               final K key,
>>>>>>>>>>>>>>>>               final RecordContext recordContext);
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> we can adopt
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> public interface RichValueMapper<V, VR, K> {
>>>>>>>>>>>>>>>>      VR apply(final V value,
>>>>>>>>>>>>>>>>               final K key,
>>>>>>>>>>>>>>>>               final RecordContext recordContext,
>>>>>>>>>>>>>>>>               final ProcessorContext processorContext);
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> However, in this case, a user can get confused as
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ProcessorContext
>>>>>>>
>>>>>>>> and
>>>>>>>>>>>>
>>>>>>>>>>>>> RecordContext share some methods with the same name.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/KAFKA-3907
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Oct 17, 2017 at 3:19 AM Guozhang Wang <
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> wangg...@gmail.com
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regarding #6 above, I'm still not clear why we would need
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> `commit()`
>>>>>>>>>>
>>>>>>>>>>> in
>>>>>>>>>>>>>
>>>>>>>>>>>>>> both ProcessorContext and RecordContext, could you elaborate
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> a
>>>>>>
>>>>>>> bit
>>>>>>>>
>>>>>>>>> more?
>>>>>>>>>>>>>
>>>>>>>>>>>>>> To me `commit()` is really a processor context not a record
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> context
>>>>>>>>
>>>>>>>>> logically: when you call that function, it means we would
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> commit
>>>>>>>
>>>>>>>> the
>>>>>>>>>>
>>>>>>>>>>> state
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> of the whole task up to this processed record, not only that
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> single
>>>>>>>>
>>>>>>>>> record
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> itself.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Mon, Oct 16, 2017 at 9:19 AM, Jeyhun Karimov <
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> je.kari...@gmail.com
>>>>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks for the feedback.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 0. RichInitializer definition seems missing.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - Fixed.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>   I'd suggest moving the key parameter in the RichValueXX
>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> RichReducer
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> after the value parameters, as well as in the templates;
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> e.g.
>>>>>>
>>>>>>> public interface RichValueJoiner<V1, V2, VR, K> {
>>>>>>>>>>>>>>>>>>>      VR apply(final V1 value1, final V2 value2, final K
>>>>>>>>>>>>>>>>>>> key,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> final
>>>>>>>>
>>>>>>>>> RecordContext
>>>>>>>>>>>>>>>>>>> recordContext);
>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - Fixed.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 2. Some of the listed functions are not necessary since
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> their
>>>>>>
>>>>>>> pairing
>>>>>>>>>>>>
>>>>>>>>>>>>> APIs
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> are being deprecated in 1.0 already:
>>>>>>>>>>>>>>>>>>> <KR> KGroupedStream<KR, V> groupBy(final
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> RichKeyValueMapper<?
>>>>>>
>>>>>>> super
>>>>>>>>>>>>
>>>>>>>>>>>>> K,
>>>>>>>>>>>>>
>>>>>>>>>>>>>> ?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> super V, KR> selector,
>>>>>>>>>>>>>>>>>>>                                     final Serde<KR>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> keySerde,
>>>>>>
>>>>>>>                                     final Serde<V>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> valSerde);
>>>>>>
>>>>>>> <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
>>>>>>>>>>>>>>>>>>>                                   final RichValueJoiner<?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> super
>>>>>>>
>>>>>>>> K,
>>>>>>>>
>>>>>>>>> ?
>>>>>>>>>>>>
>>>>>>>>>>>>> super
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> V,
>>>>>>>>>>>>>>>>>>> ? super VT, ? extends VR> joiner,
>>>>>>>>>>>>>>>>>>>                                   final Serde<K>
>>>>>>>>>>>>>>>>>>> keySerde,
>>>>>>>>>>>>>>>>>>>                                   final Serde<V>
>>>>>>>>>>>>>>>>>>> valSerde);
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> -Fixed
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 3. For a few functions where we are adding three APIs for
>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> combo
>>>>>>>>
>>>>>>>>> of
>>>>>>>>>>>>
>>>>>>>>>>>>> both
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> mapper / joiner, or both initializer / aggregator, or
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> adder /
>>>>>>
>>>>>>> subtractor,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I'm wondering if we can just keep one that use "rich"
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> functions
>>>>>>>
>>>>>>>> for
>>>>>>>>>>>>
>>>>>>>>>>>>> both;
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> so that we can have less overloads and let users who only
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> want
>>>>>>>
>>>>>>>> to
>>>>>>>>
>>>>>>>>> access
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> one of them to just use dummy parameter declarations. For
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> example:
>>>>>>>>>>
>>>>>>>>>>> <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> globalKTable,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>                                   final
>>>>>>>>>>>>>>>>>>> RichKeyValueMapper<?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> super
>>>>>>>>
>>>>>>>>> K, ?
>>>>>>>>>>>>>
>>>>>>>>>>>>>> super
>>>>>>>>>>>>>>>>>>>   V, ? extends GK> keyValueMapper,
>>>>>>>>>>>>>>>>>>>                                   final RichValueJoiner<?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> super
>>>>>>>
>>>>>>>> K,
>>>>>>>>
>>>>>>>>> ?
>>>>>>>>>>>>
>>>>>>>>>>>>> super
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> V,
>>>>>>>>>>>>>>>>>>> ? super GV, ? extends RV> joiner);
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> -Agreed. Fixed.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 4. For TimeWindowedKStream, I'm wondering why we do not
>>>>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> its
>>>>>>>
>>>>>>>> Initializer also "rich" functions? I.e.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> - It was a typo. Fixed.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 5. We need to move "RecordContext" from
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> o.a.k.processor.internals
>>>>>>>>
>>>>>>>>> to
>>>>>>>>>>>>
>>>>>>>>>>>>> o.a.k.processor.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 6. I'm not clear why we want to move `commit()` from
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> ProcessorContext
>>>>>>>>>>>>>
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> RecordContext?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> -
>>>>>>>>>>>>>>>>>> Because it makes sense logically and  to reduce code
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> maintenance
>>>>>>>
>>>>>>>> (both
>>>>>>>>>>>>>
>>>>>>>>>>>>>> interfaces have offset() timestamp() topic() partition()
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> methods),  I
>>>>>>>>>>>>
>>>>>>>>>>>>> inherit ProcessorContext from RecordContext.
>>>>>>>>>>>>>>>>>> Since we need commit() method both in ProcessorContext and
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> in
>>>>>>
>>>>>>> RecordContext
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I move commit() method to parent class (RecordContext).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Oct 11, 2017 at 12:59 AM, Guozhang Wang <
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Jeyhun,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for the updated KIP, here are my comments.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 0. RichInitializer definition seems missing.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 1. I'd suggest moving the key parameter in the
>>>>>>>>>>>>>>>>>>> RichValueXX
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> and
>>>>>>>
>>>>>>>> RichReducer
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> after the value parameters, as well as in the templates;
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> e.g.
>>>>>>
>>>>>>> public interface RichValueJoiner<V1, V2, VR, K> {
>>>>>>>>>>>>>>>>>>>      VR apply(final V1 value1, final V2 value2, final K
>>>>>>>>>>>>>>>>>>> key,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> final
>>>>>>>>
>>>>>>>>> RecordContext
>>>>>>>>>>>>>>>>>>> recordContext);
>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> My motivation is that for lambda expression in J8, users
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> that
>>>>>>
>>>>>>> would
>>>>>>>>>>>>
>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> care about the key but only the context, or vice versa, is
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> likely
>>>>>>>>
>>>>>>>>> to
>>>>>>>>>>>>
>>>>>>>>>>>>> write
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> it as (value1, value2, dummy, context) -> ... than
>>>>>>>>>>>>>>>>>>> putting
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> the
>>>>>>>
>>>>>>>> dummy
>>>>>>>>>>>>
>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> beginning of the parameter list. Generally speaking we'd
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> like
>>>>>>
>>>>>>> to
>>>>>>>>
>>>>>>>>> make
>>>>>>>>>>>>>
>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>


-- 
-- Guozhang

Reply via email to