Jan,

I apologize for the delayed response.

my suggestion would be that instead of
>
> SOURCE -> KTABLESOURCE -> KTABLEFILTER -> JOIN -> SINK
>
> we build
>
> SOURCE  -> KTABLEFILTER ->  KTABLESOURCE -> JOIN -> SINK


I agree that filtering before the KTable source makes sense and would be a
positive change to implement.

But the situation above is just one scenario out of many we need to
consider.  I'm not sure we can cover all the implications from different
use cases ahead of time.

So I'm inclined to agree with Guozhang that we come up with clear "rules"
(I use the word rules for lack of a better term) for RecordContext usage
and inheritance. That way going forward we can have distinct expectations
of different use cases.

-Bill

On Fri, Dec 15, 2017 at 3:57 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Regarding the record context inheritance: I agree it may be a better idea
> for now to drop the information when we cannot come up with a consensus
> about how the record context should be inherited. Like Bill I was a bit
> worried about the lacking of such data lineage information for trouble
> shooting in operations or debugging in coding; but I think we can still try
> to come up with better solutions in the future by extending the current
> protocol, than coming up with something that we realized that we need to
> change in the future.
>
> Regarding the table / filter question: I agree with Jan that we could
> consider update the builder so that we will push down the filter earlier
> than KTable source that materialized the store; on the other hand, I think
> Matthias' point is that even doing this does not completely exclude the
> scenarios that you'd have the old/new pairs in your Tables, for example,
> consider:
>
> table1 = stream1.groupBy(...).aggregate(...)
> table2 = table1.filter(..., Materialized.as(...))
>
> In this case table2 is filtering on table1 which is not read from the
> source, and hence it already outputs the old/new pairs already, so we still
> need to consider how to handle it.
>
>
> So I'd suggest the following execution plan towards KIP-159:
>
> 1) revisit our record context (topic, partition, offset, timestamp)
> protocols that is used at the DSL layer, make it clear at which high-level
> operators we should apply certain inheritance rule, and which others we
> should drop such information.
>     1.1) modify the lower-level PAPI that DSL leverages, to allow the
> caller (DSL) to modify the record context (note that today for lower-level
> API, the record context is always passed through when forwarding to the
> next processor node)
> 2) at the same time, consider optimizing the source KTable filter cases (I
> think we already have certain JIRA tickets for this) so that the filter
> operator is pushed early than the KTABLESOURCE node where materialization
> happens.
> 3) after 1) is done, come back to KIP-159 and add the proposed APIs.
>
>
> Guozhang
>
>
> On Thu, Dec 7, 2017 at 12:27 PM, Jan Filipiak <jan.filip...@trivago.com>
> wrote:
>
> > Thank you Bill,
> >
> > I think this is reasonable. Do you have any suggestion
> > for handling oldValues in cases like
> >
> > builder.table().filter(RichPredicate).join()
> >
> > where we process a Change with old and new value and dont have a record
> > context for old.
> >
> > my suggestion would be that instead of
> >
> > SOURCE -> KTABLESOURCE -> KTABLEFILTER -> JOIN -> SINK
> >
> > we build
> >
> > SOURCE  -> KTABLEFILTER ->  KTABLESOURCE -> JOIN -> SINK
> >
> > We should build a topology like this from the beginning and not have
> > an optimisation phase afterwards.
> >
> > Any opinions?
> >
> > Best Jan
> >
> >
> >
> >
> > On 05.12.2017 17:34, Bill Bejeck wrote:
> >
> >> Matthias,
> >>
> >> Overall I agree with what you've presented here.
> >>
> >> Initially, I was hesitant to remove information from the context of the
> >> result records (Joins or Aggregations) with the thought that when there
> >> are
> >> unexpected results, the source information would be useful for tracing
> >> back
> >> where the error could have occurred.  But in the case of Joins and
> >> Aggregations, the amount of data needed to do meaningful analysis could
> be
> >> too much. For example, a join result could come from two topics so you'd
> >> need to keep both original topic names, offsets, etc. (plus the broker
> >> could have deleted the records in the interim so even having offset
> could
> >> provide nothing).
> >>
> >> I'm bit long winded here, but I've come full circle to your original
> >> proposal that since Joins and Aggregations produce fundamentally new
> >> types,
> >> we drop the corresponding information from the context even in the case
> of
> >> single topic aggregations.
> >>
> >> Thanks,
> >> Bill
> >>
> >> On Mon, Dec 4, 2017 at 7:02 PM, Matthias J. Sax <matth...@confluent.io>
> >> wrote:
> >>
> >> I agree with Guozhang that just exposing meta data at the source level
> >>> might not provide too much value. Furthermore, for timestamps we do
> >>> already have a well defined contract and we should exploit it:
> >>> timestamps can always be provided in a meaningful way.
> >>>
> >>> Also, for simple operations like KStream-filter/map the contract is
> >>> simple and we can just use it. Same for KTable-filter/map (for new
> >>> values).
> >>>
> >>> For aggregations, join, and oldValue, I could just drop some
> information
> >>> and return `null`/-1, if the result records has no semantically
> >>> meaningful meta data.
> >>>
> >>> For example, for aggregations, we could preserve the partition (as all
> >>> agg-input-records have the same partition). For single input topic
> >>> aggregation (what I guess is the most prominent case), we can also
> carry
> >>> over the topic name (would be a internal repartitioning topic name
> >>> often). Offsets don't have any semantic interpretation IMHO and we
> could
> >>> return -1.
> >>>
> >>> For joins, we could keep the partition information. Topic and offset
> are
> >>> both unknown/invalid for the output record IMHO.
> >>>
> >>> For the oldValue case, we can keep partition and for single input topic
> >>> case topic name. Timestamp might be -1 for now, but after we added
> >>> timestamps to KTable (what we plan to do anyway), we can also return a
> >>> valid timestamp. Offset would be -1 again (if we store offset in KTable
> >>> too, we could provide all offset as well -- but I don't see too much
> >>> value in doing this compared to the storage overhead this implies).
> >>>
> >>>
> >>> WDYT?
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 11/29/17 4:14 AM, Jan Filipiak wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> thank you for the summary and thanks for acknowledging that I do have
> a
> >>>> point here.
> >>>>
> >>>> I don't like the second Idea at all. Hence I started of this
> discussion.
> >>>>
> >>>> I am just disappointed, back then when we had the discussion about how
> >>>> to refactor store overload
> >>>> and IQ handling, I knew the path we are taking is wrong. Having
> problems
> >>>> implementing these kinda
> >>>> features (wich are really simple)  is just a symptom of messed up IQ
> >>>> implementation. I wish really bad
> >>>> I could have convinced you guys back then. To be honest with IQ we can
> >>>> continue here
> >>>> as we Materialize but would not send oldValue, but with join you're
> out
> >>>> of luck with current setup.
> >>>>
> >>>> I of course recommend to do not introduce any optimizations here. Id
> >>>> recommend to go towards what
> >>>> I recommended already back then. So i would't say we need to optimize
> >>>> anything later we need to build
> >>>> the topology better in the first place.
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On 28.11.2017 21:00, Guozhang Wang wrote:
> >>>>
> >>>>> Jan,
> >>>>>
> >>>>> Thanks for your input, I can understand now that the oldValue is also
> >>>>> exposed in user customized `filter` function and hence want record
> >>>>> context
> >>>>> we should expose is a problem. And I think it does brings a good
> point
> >>>>>
> >>>> to
> >>>
> >>>> consider for KIP-159. The discussions maybe a bit confusing to reader
> >>>>> though, and hence I'd like to summarize the status quo and with a
> >>>>> proposal:
> >>>>>
> >>>>> In today's Streams DSL, when a KTable is created either from a source
> >>>>> topic, or from an stateful operator, we will materialize the KTable
> >>>>> with a
> >>>>> backing state store; on the other hand, KTables created from a
> >>>>> non-stateful
> >>>>> operator like filter, will not be backed by a state store by default
> >>>>> unless
> >>>>> users indicate so (e.g. using the overloaded function with the
> >>>>> queryable
> >>>>> name or store supplier).
> >>>>>
> >>>>> For example:
> >>>>>
> >>>>> KTable table1 = builder.table("topic");
> >>>>>
> >>>> // a
> >>>
> >>>> state store created for table1
> >>>>> KTable table2 = table1.filter(..);
> >>>>> // no state store created for table2
> >>>>> KTable table3 = table1.filter(.., "storeName");                  // a
> >>>>> state
> >>>>> store created for table3
> >>>>> KTable table4 = table1.groupBy(..).aggregate(..);            // a
> >>>>> state
> >>>>> store created for table4
> >>>>>
> >>>>> Because of that, the filter() operator above on table1 will always be
> >>>>> exposed with oldValue and newValue; Damian's point is that, we may
> >>>>> optimize
> >>>>> the first case such that table1 will only be materialized if users
> >>>>> asked so
> >>>>> (e.g. using the overloaded function with a store supplier), and in
> >>>>> which
> >>>>> case, we do not need to pass newValue / oldValue pairs (I think this
> is
> >>>>> what Jan suggests as well, i.e. do filtering before materializing, so
> >>>>> that
> >>>>> we can have a smaller backed state store as well). But this
> >>>>> optimization
> >>>>> does not eliminate the possibilities that we may still need to do
> >>>>> filter if
> >>>>> users does specify "yes I do want to the source KTable itself to be
> >>>>> materialized, please". So the concern about how to expose the record
> >>>>> context in such cases still persists.
> >>>>>
> >>>>>
> >>>>> With that, regarding to KIP-159 itself, here are my thoughts:
> >>>>>
> >>>>> 1) if we restrict the scope of exposing record context only to source
> >>>>> KTables / KStreams I felt the KIP itself does not bring much value
> >>>>> given
> >>>>> its required API change because only the SourceKStream can safely
> >>>>> maintain
> >>>>> its records context, and for SourceKTable if it is materialized, then
> >>>>> even
> >>>>> non-stateful operators like Join may still have a concern about
> >>>>> exposing
> >>>>> the record context.
> >>>>>
> >>>>> 2) an alternative idea is we provide the semantics on how record
> >>>>> context
> >>>>> would be inherited across the operators for KTable / KStream and
> >>>>> expose it
> >>>>> in all operators (similarly in PAPI we would expose a much simpler
> >>>>> contract), and make it as a public contract that Streams library will
> >>>>> guarantee moving forward even we optimize our topology builder; it
> may
> >>>>> not
> >>>>> align perfectly with the linear algebraic semantics but practically
> >>>>> applicable for most cases; if users semantics do not fit in the
> >>>>> provided
> >>>>> contract, then they may need to get this themselves (embed such
> >>>>> information
> >>>>> in the value payload, for example).
> >>>>>
> >>>>> If people do not like the second idea, I'd suggest we hold on
> pursuing
> >>>>> the
> >>>>> first direction since to me its beneficial scope is too limited
> >>>>> compared to
> >>>>> its cost.
> >>>>>
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Fri, Nov 24, 2017 at 1:39 AM, Jan Filipiak <
> >>>>> jan.filip...@trivago.com
> >>>>> wrote:
> >>>>>
> >>>>> Cleary we show the oldValue to the user. We have to, because we
> filter
> >>>>>> after the store.
> >>>>>> https://github.com/axbaretto/kafka/blob/master/streams/src/m
> >>>>>> ain/java/org/apache/kafka/streams/kstream/internals/
> >>>>>>
> >>>>> KTableFilter.java#L96
> >>>
> >>>>
> >>>>>> I cannot help you following this. It is really obvious and I am
> >>>>>> running
> >>>>>> out of tools for explaining.
> >>>>>>
> >>>>>> Thanks for understanding my point to put filter before. Not only
> >>>>>> would it
> >>>>>> make the store smaller. It would make this feature reasonably
> >>>>>> possible and
> >>>>>> the framework easier. Interestingly it would also help to move IQ
> >>>>>> into more
> >>>>>> reasonable directions. And it might help understand that we do not
> >>>>>> need any
> >>>>>> intermediate representation of the topology,
> >>>>>>
> >>>>>> KIP-182 I have no clue what everyone has with their "bytestores" so
> >>>>>> broken. But putting another store after doesn't help when the store
> >>>>>> before
> >>>>>> is the problem.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 24.11.2017 05:08, Matthias J. Sax wrote:
> >>>>>>
> >>>>>>    From a DSL point of view, users only see the new value on a
> >>>>>>> KTable#filter anyway. So why should it be an issue that we use
> >>>>>>> <newValue,oldValue> pair under the hood?
> >>>>>>>
> >>>>>>> User sees newValue and gets corresponding RecordContext. I can't
> see
> >>>>>>> any
> >>>>>>> issue here?
> >>>>>>>
> >>>>>>> I cannot follow here:
> >>>>>>>
> >>>>>>> Even when we have a statefull operation last. We move it to the
> very
> >>>>>>>
> >>>>>>>> first processor (KtableSource)
> >>>>>>>>> and therefore cant present a proper RecordContext.
> >>>>>>>>>
> >>>>>>>>> With regard to `builder.table().filter()`:
> >>>>>>>
> >>>>>>> I see you point that it would be good to be able to apply the
> >>>>>>> filter()
> >>>>>>> first to reduce the stat store size of the table. But how is this
> >>>>>>> related to KIP-159?
> >>>>>>>
> >>>>>>> Btw: with KIP-182, I am wondering if this would not be possible, by
> >>>>>>> putting a custom dummy store into the table and materialize the
> >>>>>>> filter
> >>>>>>> result afterwards? It's not a nice way to do, but seems to be
> >>>>>>>
> >>>>>> possible.
> >>>
> >>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>> On 11/23/17 4:56 AM, Jan Filipiak wrote:
> >>>>>>>
> >>>>>>> The comment is valid. It falls exactly into this topic, it has
> >>>>>>>>
> >>>>>>> exactly
> >>>
> >>>> todo with this!
> >>>>>>>> Even when we have a statefull operation last. We move it to the
> very
> >>>>>>>> first processor (KtableSource)
> >>>>>>>> and therefore cant present a proper RecordContext.
> >>>>>>>>
> >>>>>>>> Regarding the other Jiras you are referring to. They harm the
> >>>>>>>> project
> >>>>>>>> more than they do good!
> >>>>>>>> There is no need for this kind of optimizer and meta
> representation
> >>>>>>>> and
> >>>>>>>> what not. I hope they
> >>>>>>>> never get implemented.
> >>>>>>>>
> >>>>>>>> Best Jan
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 22.11.2017 14:44, Damian Guy wrote:
> >>>>>>>>
> >>>>>>>> Jan, i think you comment with respect to filtering is valid,
> though
> >>>>>>>>> not for
> >>>>>>>>> this KIP. We have separate JIRAs for topology optimization of
> >>>>>>>>> which this
> >>>>>>>>> falls into.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Damian
> >>>>>>>>>
> >>>>>>>>> On Wed, 22 Nov 2017 at 02:25 Guozhang Wang <wangg...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Jan,
> >>>>>>>>>
> >>>>>>>>>> Not sure I understand your argument that "we still going to
> >>>>>>>>>> present
> >>>>>>>>>> change.oldValue to the filter even though the record context()
> is
> >>>>>>>>>> for
> >>>>>>>>>> change.newValue". Are you referring to `KTableFilter#process()`?
> >>>>>>>>>> If yes
> >>>>>>>>>> could you point to me which LOC are you concerning about?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Guozhang
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Mon, Nov 20, 2017 at 9:29 PM, Jan Filipiak <
> >>>>>>>>>> jan.filip...@trivago.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> a remark of mine that got missed during migration:
> >>>>>>>>>>
> >>>>>>>>>>> There is this problem that even though we have
> >>>>>>>>>>> source.table.filter.join
> >>>>>>>>>>> the state-fullness happens at the table step not a the join
> >>>>>>>>>>> step. In a
> >>>>>>>>>>> filter
> >>>>>>>>>>> we still going to present change.oldValue to the filter even
> >>>>>>>>>>>
> >>>>>>>>>> though
> >>>
> >>>> the
> >>>>>>>>>>> record context() is for change.newValue. I would go as far as
> >>>>>>>>>>> applying
> >>>>>>>>>>> the filter before the table processor. Not to just get KIP-159,
> >>>>>>>>>>>
> >>>>>>>>>> but
> >>>
> >>>> because
> >>>>>>>>>>
> >>>>>>>>>> I think its a side effect of a non ideal topology layout. If i
> can
> >>>>>>>>>>> filter
> >>>>>>>>>>> 99% of my
> >>>>>>>>>>> records. my state could be way smaller. Also widely escalates
> the
> >>>>>>>>>>> context
> >>>>>>>>>>> of the KIP
> >>>>>>>>>>>
> >>>>>>>>>>> I can only see upsides of executing the filter first.
> >>>>>>>>>>>
> >>>>>>>>>>> Best Jan
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 20.11.2017 22:22, Matthias J. Sax wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> I am moving this back to the DISCUSS thread... Last 10 emails
> >>>>>>>>>>> were
> >>>>>>>>>>>
> >>>>>>>>>>>> sent
> >>>>>>>>>>>> to VOTE thread.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Copying Guozhang's last summary below. Thanks for this
> summary.
> >>>>>>>>>>>> Very
> >>>>>>>>>>>> comprehensive!
> >>>>>>>>>>>>
> >>>>>>>>>>>> It seems, we all agree, that the current implementation of the
> >>>>>>>>>>>> context
> >>>>>>>>>>>> at PAPI level is ok, but we should not leak it into DSL.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thus, we can go with (2) or (3), were (3) is an extension to
> (2)
> >>>>>>>>>>>> carrying the context to more operators than just sources. It
> >>>>>>>>>>>> also
> >>>>>>>>>>>> seems,
> >>>>>>>>>>>> that we all agree, that many-to-one operations void the
> context.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I still think, that just going with plain (2) is too
> >>>>>>>>>>>> restrictive --
> >>>>>>>>>>>> but
> >>>>>>>>>>>> I am also fine if we don't go with the full proposal of (3).
> >>>>>>>>>>>>
> >>>>>>>>>>>> Also note, that the two operators filter() and filterNot()
> don't
> >>>>>>>>>>>> modify
> >>>>>>>>>>>> the record and thus for both, it would be absolutely valid to
> >>>>>>>>>>>>
> >>>>>>>>>>> keep
> >>>
> >>>> the
> >>>>>>>>>>>> context.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I personally would keep the context for at least all
> one-to-one
> >>>>>>>>>>>> operators. One-to-many is debatable and I am fine to not carry
> >>>>>>>>>>>>
> >>>>>>>>>>> the
> >>>
> >>>> context further: at least the offset information is
> >>>>>>>>>>>> questionable for
> >>>>>>>>>>>> this case -- note thought, that semantically, the timestamp is
> >>>>>>>>>>>> inherited
> >>>>>>>>>>>> via one-to-many, and I also think this applies to "topic" and
> >>>>>>>>>>>> "partition". Thus, I think it's still valuable information we
> >>>>>>>>>>>> can
> >>>>>>>>>>>> carry
> >>>>>>>>>>>> downstreams.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>
> >>>>>>>>>>>> Jan: which approach are you referring to as "the approach that
> >>>>>>>>>>>> is
> >>>>>>>>>>>> on the
> >>>>>>>>>>>>
> >>>>>>>>>>>> table would be perfect"?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Note that in today's PAPI layer we are already effectively
> >>>>>>>>>>>>> exposing the
> >>>>>>>>>>>>> record context which has the issues that we have been
> >>>>>>>>>>>>> discussing
> >>>>>>>>>>>>> right
> >>>>>>>>>>>>> now,
> >>>>>>>>>>>>> and its semantics is always referring to the "processing
> >>>>>>>>>>>>> record" at
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> hand.
> >>>>>>>>>>>>
> >>>>>>>>>>> More specifically, we can think of processing a record a bit
> >>>>>>>>>>>
> >>>>>>>>>>>> different:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1) the record traversed the topology from source to sink, it
> >>>>>>>>>>>>> may be
> >>>>>>>>>>>>> transformed into new object or even generate multiple new
> >>>>>>>>>>>>>
> >>>>>>>>>>>> objects
> >>>
> >>>> (think:
> >>>>>>>>>>>>
> >>>>>>>>>>> branch) along the traversal. And the record context is
> referring
> >>>>>>>>>>>
> >>>>>>>>>> to
> >>>
> >>>> this
> >>>>>>>>>>>>
> >>>>>>>>>>> processing record. Here the "lifetime" of the record lasts for
> >>>>>>>>>>> the
> >>>>>>>>>>>
> >>>>>>>>>>>> entire
> >>>>>>>>>>>>
> >>>>>>>>>>> topology traversal and any new records of this traversal is
> >>>>>>>>>>>
> >>>>>>>>>>>> treated as
> >>>>>>>>>>>>> different transformed values of this record (this applies to
> >>>>>>>>>>>>>
> >>>>>>>>>>>> join
> >>>
> >>>> and
> >>>>>>>>>>>>> aggregations as well).
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2) the record being processed is wiped out in the first
> >>>>>>>>>>>>> operator
> >>>>>>>>>>>>> after
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>> source, and NEW records are forwarded to downstream
> operators.
> >>>>>>>>>>>>> I.e.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> each
> >>>>>>>>>>>>
> >>>>>>>>>>> record only lives between two adjacent operators, once it
> >>>>>>>>>>> reached the
> >>>>>>>>>>>
> >>>>>>>>>>>> new
> >>>>>>>>>>>>
> >>>>>>>>>>> operator it's lifetime has ended and new records are generated.
> >>>>>>>>>>>
> >>>>>>>>>>>> I think in the past we have talked about Streams under both
> >>>>>>>>>>>>> context,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>
> >>>>>>>>>>> we
> >>>>>>>>>>>
> >>>>>>>>>>>> do not have a clear agreement. I agree that 2) is logically
> more
> >>>>>>>>>>>>> understandable for users as it does not leak any internal
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> implementation
> >>>>>>>>>>>>
> >>>>>>>>>>> details (e.g. for stream-table joins, table record's traversal
> >>>>>>>>>>>
> >>>>>>>>>>>> ends at
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>> join operator as it is only be materialized, while stream
> >>>>>>>>>>>>> record's
> >>>>>>>>>>>>> traversal goes through the join operator to further down
> until
> >>>>>>>>>>>>> sinks).
> >>>>>>>>>>>>> However if we are going to interpret following 2) above then
> >>>>>>>>>>>>>
> >>>>>>>>>>>> even
> >>>
> >>>> for
> >>>>>>>>>>>>> non-stateful operators we would not inherit record context.
> >>>>>>>>>>>>> What
> >>>>>>>>>>>>> we're
> >>>>>>>>>>>>> discussing now, seems to infer a third semantics:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3) a record would traverse "through" one-to-one
> (non-stateful)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> operators,
> >>>>>>>>>>>>
> >>>>>>>>>>> will "replicate" at one-to-many (non-stateful) operators
> (think:
> >>>>>>>>>>>
> >>>>>>>>>>>> "mapValues"
> >>>>>>>>>>>>>       ) and will "end" at many-to-one (stateful) operators
> >>>>>>>>>>>>> where NEW
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> records
> >>>>>>>>>>>>
> >>>>>>>>>>> will be generated and forwarded to the downstream operators.
> >>>>>>>>>>>
> >>>>>>>>>>>> Just wanted to lay the ground for discussions so we are all on
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>> same
> >>>>>>>>>>>>> page before chatting more.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 11/6/17 1:41 PM, Jeyhun Karimov wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks a lot for correcting. It is a leftover from the past
> >>>>>>>>>>>>> designs
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> when
> >>>>>>>>>>>>
> >>>>>>>>>>> punctuate() was not deprecated.
> >>>>>>>>>>>
> >>>>>>>>>>>> I corrected.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>> Jeyhun
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Mon, Nov 6, 2017 at 5:30 PM Matthias J. Sax
> >>>>>>>>>>>>> <matth...@confluent.io>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I just re-read the KIP.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> One minor comment: we don't need to introduce any deprecated
> >>>>>>>>>>>>>> methods.
> >>>>>>>>>>>>>> Thus, RichValueTransformer#punctuate can be removed
> completely
> >>>>>>>>>>>>>> instead
> >>>>>>>>>>>>>> of introducing it as deprecated.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Otherwise looks good to me.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for being so patient!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 11/1/17 9:16 PM, Guozhang Wang wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Jeyhun,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I think I'm convinced to not do KAFKA-3907 in this KIP. We
> >>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> carefully if we should add this functionality to the DSL
> layer
> >>>>>>>>>>>
> >>>>>>>>>>>> moving
> >>>>>>>>>>>>>>> forward since from what we discovered working on it the
> >>>>>>>>>>>>>>> conclusion is
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> it would require revamping the public APIs quite a lot, and
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> it's
> >>>
> >>>> not
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> clear
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> if it is a good trade-off than asking users to call
> process()
> >>>>>>>>>>>>>> instead.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>>> On Wed, Nov 1, 2017 at 4:50 AM, Damian Guy
> >>>>>>>>>>>>>>> <damian....@gmail.com>
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi Jeyhun, thanks, looks good.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Do we need to remove the line that says:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>         - on-demand commit() feature
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>> Damian
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Tue, 31 Oct 2017 at 23:07 Jeyhun Karimov <
> >>>>>>>>>>>>>>>> je.kari...@gmail.com>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I removed the 'commit()' feature, as we discussed. It
> >>>>>>>>>>>>>>>> simplified
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> overall design of KIP a lot.
> >>>>>>>>>>>
> >>>>>>>>>>>> If it is ok, I would like to start a VOTE thread.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>> Jeyhun
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Fri, Oct 27, 2017 at 5:28 PM Matthias J. Sax <
> >>>>>>>>>>>>>>>>> matth...@confluent.io
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks. I understand what you are saying, but I don't
> >>>>>>>>>>>>>>>>> agree that
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> but also we need a commit() method
> >>>>>>>>>>>>>>>>>> I would just not provide `commit()` at DSL level and
> >>>>>>>>>>>>>>>>>> close the
> >>>>>>>>>>>>>>>>>> corresponding Jira as "not a problem" or similar.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On 10/27/17 3:42 PM, Jeyhun Karimov wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks for your comments. I agree that this is not the
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> best
> >>>
> >>>> way
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> do.
> >>>>>>>>>>>
> >>>>>>>>>>>> A
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> bit of history behind this design.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Prior doing this, I tried to provide ProcessorContext
> >>>>>>>>>>>>>>>>>>> itself
> >>>>>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> argument
> >>>>>>>>>>>
> >>>>>>>>>>>> in Rich interfaces. However, we dont want to give users
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> that
> >>>
> >>>> flexibility
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> and “power”. Moreover, ProcessorContext contains
> processor
> >>>>>>>>>>>>>>>>>> level
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> information and not Record level info. The only thing we
> >>>>>>>>>>>>>>>>>>> need ij
> >>>>>>>>>>>>>>>>>>> ProcessorContext is commit() method.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> So, as far as I understood, we need recor context
> >>>>>>>>>>>>>>>>>>> (offset,
> >>>>>>>>>>>>>>>>>>> timestamp
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> etc) but also we need a commit() method ( we dont want
> to
> >>>>>>>>>>>>>>>>>> provide
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> ProcessorContext as a parameter so users can use
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> ProcessorContext.commit()
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> ).
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> As a result, I thought to “propagate” commit() call
> from
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> RecordContext
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> ProcessorContext() .
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> If there is a misunderstanding in motvation/discussion
> of
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> KIP/included
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> jiras please let me know.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Jeyhun
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Fri 27. Oct 2017 at 12:39, Matthias J. Sax <
> >>>>>>>>>>>>>>>>>>> matth...@confluent.io
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I am personally still not convinced, that we should add
> >>>>>>>>>>>>>>>>>> `commit()`
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> at
> >>>>>>>>>>>
> >>>>>>>>>>>> all.
> >>>>>>>>>>>>>>>>>> @Guozhang: you created the original Jira. Can you
> >>>>>>>>>>>>>>>>>> elaborate a
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> little
> >>>>>>>>>>>>>>>>>>>> bit? Isn't requesting commits a low level API that
> >>>>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>> not be
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> exposed
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> in the DSL? Just want to understand the motivation
> >>>>>>>>>>>>>>>>>> better. Why
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> anybody that uses the DSL ever want to request a commit?
> To
> >>>>>>>>>>>
> >>>>>>>>>>>> me,
> >>>>>>>>>>>>>>>>>>>> requesting commits is useful if you manipulated state
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> explicitly,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> ie,
> >>>>>>>>>>>
> >>>>>>>>>>>> via Processor API.
> >>>>>>>>>>>>>>>>>> Also, for the solution: it seem rather unnatural to me,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> that we
> >>>>>>>>>>>>>>>>>>>> add
> >>>>>>>>>>>>>>>>>>>> `commit()` to `RecordContext` -- from my
> understanding,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> `RecordContext`
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> is an helper object that provide access to record meta
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> data.
> >>>
> >>>> Requesting
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> a commit is something quite different. Additionally, a
> >>>>>>>>>>>>>>>>>> commit
> >>>>>>>>>>>>>>>>>> does
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> commit a specific record but a `RecrodContext` is for a
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> specific
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> record.
> >>>>>>>>>>>>>>>>>> To me, this does not seem to be a sound API design if we
> >>>>>>>>>>>>>>>>>> follow
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> path.
> >>>>>>>>>>>
> >>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On 10/26/17 10:41 PM, Jeyhun Karimov wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks for your suggestions.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I have some comments, to make sure that there is no
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> misunderstanding.
> >>>>>>>>>>>>>>>>>>>>> 1. Maybe we can deprecate the `commit()` in
> >>>>>>>>>>>>>>>>>>>>> ProcessorContext,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> enforce
> >>>>>>>>>>>
> >>>>>>>>>>>> user to consolidate this call as
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> "processorContext.recordContext().commit()". And
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> internal
> >>>
> >>>> implementation
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> `ProcessorContext.commit()` in `ProcessorContextImpl`
> is
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> changed
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> this call.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> - I think we should not deprecate
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> `ProcessorContext.commit()`.
> >>>>>>>>>>>>>>>>>>>>> The
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> main
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> intuition that we introduce `commit()` in
> >>>>>>>>>>>>>>>>>>> `RecordContext` is
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> that,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> `RecordContext` is the one which is provided in Rich
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> interfaces.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> So
> >>>>>>>>>>>
> >>>>>>>>>>>> if
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> user
> >>>>>>>>>>>>>>>>>>> wants to commit, then there should be some method
> inside
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> `RecordContext`
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> do so. Internally, `RecordContext.commit()` calls
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> `ProcessorContext.commit()`  (see the last code
> >>>>>>>>>>>>>>>>>>>>> snippet in
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> KIP-159):
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> @Override
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>          public void process(final K1 key, final V1
> >>>>>>>>>>>>>>>>>> value) {
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>              recordContext = new RecordContext()
> >>>>>>>>>>>>>>>>>>>>> {               //
> >>>>>>>>>>>>>>>>>>>>> recordContext initialization is added in this KIP
> >>>>>>>>>>>>>>>>>>>>>                  @Override
> >>>>>>>>>>>>>>>>>>>>>                  public void commit() {
> >>>>>>>>>>>>>>>>>>>>>                      context().commit();
> >>>>>>>>>>>>>>>>>>>>>                  }
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>                  @Override
> >>>>>>>>>>>>>>>>>>>>>                  public long offset() {
> >>>>>>>>>>>>>>>>>>>>>                      return
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> context().recordContext().offs
> >>>
> >>>> et();
> >>>>>>>>>>>>>>>>>>>>>                  }
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>                  @Override
> >>>>>>>>>>>>>>>>>>>>>                  public long timestamp() {
> >>>>>>>>>>>>>>>>>>>>>                      return
> >>>>>>>>>>>>>>>>>>>>> context().recordContext().timestamp();
> >>>>>>>>>>>>>>>>>>>>>                  }
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>                  @Override
> >>>>>>>>>>>>>>>>>>>>>                  public String topic() {
> >>>>>>>>>>>>>>>>>>>>>                      return
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> context().recordContext().topi
> >>>
> >>>> c();
> >>>>>>>>>>>>>>>>>>>>>                  }
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>                  @Override
> >>>>>>>>>>>>>>>>>>>>>                  public int partition() {
> >>>>>>>>>>>>>>>>>>>>>                      return
> >>>>>>>>>>>>>>>>>>>>> context().recordContext().partition();
> >>>>>>>>>>>>>>>>>>>>>                  }
> >>>>>>>>>>>>>>>>>>>>>            };
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> So, we cannot deprecate `ProcessorContext.commit()`
> in
> >>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> case
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> IMO.
> >>>>>>>>>>>
> >>>>>>>>>>>> 2. Add the `task` reference to the impl class,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> that it can implement the commit call itself.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> - Actually, I don't think that we need `commit()` in
> >>>>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`. The main intuition is to
> >>>>>>>>>>>>>>>>>>>>> "transfer"
> >>>>>>>>>>>>>>>>>>>>> `ProcessorContext.commit()` call to Rich interfaces,
> to
> >>>>>>>>>>>>>>>>>>>>> support
> >>>>>>>>>>>>>>>>>>>>> user-specific committing.
> >>>>>>>>>>>>>>>>>>>>>       To do so, we introduce `commit()` method in
> >>>>>>>>>>>>>>>>>>>>> `RecordContext()`
> >>>>>>>>>>>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> call ProcessorContext.commit() inside. (see the above
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> code
> >>>
> >>>> snippet)
> >>>>>>>>>>>>>>>>>>>>> So, in Rich interfaces, we are not dealing with
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> at all, and we leave all its methods as it is.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> In this KIP, we made `RecordContext` to be the parent
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> class of
> >>>>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`, just because of they share
> >>>>>>>>>>>>>>>>>>>>> quite
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> amount
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>
> >>>>>>>>>>>> methods and it is logical to enable inheritance between
> >>>>>>>>>>>>>>>>>>> those
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> two.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 3. In the wiki page, the statement that "However, call
> to a
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> commit()
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> method,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> is valid only within RecordContext interface (at least
> for
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> now),
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>
> >>>>>>>>>>>> throw
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> an exception in ProcessorRecordContext.commit()." and
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> code
> >>>>>>>>>>>>>>>>>>>>> snippet
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> below would need to be updated as well.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> - I think above explanation covers this as well.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I want to gain some speed to this KIP, as it has gone
> >>>>>>>>>>>>>>>>>>>>> though
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> many
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> changes
> >>>>>>>>>>>
> >>>>>>>>>>>> based on user/developer needs, both in
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> documentation-/implementation-wise.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Jeyhun
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Tue, Oct 24, 2017 at 1:41 AM Guozhang Wang <
> >>>>>>>>>>>>>>>>>>>>> wangg...@gmail.com>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thanks for the information Jeyhun. I had also forgot
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> about
> >>>
> >>>> KAFKA-3907
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> this KIP..
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Thinking a bit more, I'm now inclined to go with what
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> we
> >>>
> >>>> agreed
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> before,
> >>>>>>>>>>>
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> add the commit() call to `RecordContext`. A few minor
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> tweaks on
> >>>>>>>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> implementation:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 1. Maybe we can deprecate the `commit()` in
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> ProcessorContext,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> enforce
> >>>>>>>>>>>
> >>>>>>>>>>>> user to consolidate this call as
> >>>>>>>>>>>>>>>>>>>> "processorContext.recordContext().commit()". And
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> internal
> >>>
> >>>> implementation
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> `ProcessorContext.commit()` in `ProcessorContextImpl`
> is
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> changed
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> this call.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 2. Add the `task` reference to the impl class,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`, so
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> that it can implement the commit call itself.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 3. In the wiki page, the statement that "However,
> >>>>>>>>>>>>>>>>>>>>>> call to a
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> commit()
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> method,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> is valid only within RecordContext interface (at least
> >>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> now),
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>
> >>>>>>>>>>>> throw
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> an exception in ProcessorRecordContext.commit()." and
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> code
> >>>>>>>>>>>>>>>>>>>>> snippet
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> below would need to be updated as well.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 23, 2017 at 1:40 PM, Matthias J. Sax <
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>> Fair point. This is a long discussion and I totally
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> forgot
> >>>
> >>>> that
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>
> >>>>>>>>>>>> discussed this.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Seems I changed my opinion about including
> KAFKA-3907...
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Happy to hear what others think.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On 10/23/17 1:20 PM, Jeyhun Karimov wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> It is probably my bad, the discussion was a bit
> >>>>>>>>>>>>>>>>>>>>>>>> long in
> >>>>>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> thread. I
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> proposed the related issue in the related KIP
> discuss
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> thread
> >>>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> got
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> approval [2,3].
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Maybe I misunderstood.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>>>>>>> http://search-hadoop.com/m/Kaf
> >>>>>>>>>>>>>>>>>>>>>>>> ka/uyzND19Asmg1GKKXT1?subj=
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Streams
> >>>
> >>>> [2]
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> http://search-hadoop.com/m/Kaf
> >>>>>>>>>>>>>>>>>>>>>>>> ka/uyzND1kpct22GKKXT1?subj=
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Streams
> >>>
> >>>> [3]
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> http://search-hadoop.com/m/
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Kafka/uyzND1G6TGIGKKXT1?subj=
> >>>
> >>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Streams
> >>
> >>
>
>
> --
> -- Guozhang
>

Reply via email to