Yes, I've one already created: https://github.com/apache/kafka/pull/4955
On Mon, 14 May 2018, 17:55 Guozhang Wang, <wangg...@gmail.com> wrote: > Thanks Jorge, that sounds good to me. > > Also please feel free to send out the PR for reviews while the KIP is being > voted on. > > > Guozhang > > > On Mon, May 14, 2018 at 8:31 AM, Jorge Esteban Quilcate Otoya < > quilcate.jo...@gmail.com> wrote: > > > Thanks for your feedback everyone! > > > > If there is no more comments on this KIP, I think we can open the VOTE > > thread. > > > > Cheers, > > Jorge. > > > > El sáb., 12 may. 2018 a las 2:02, Guozhang Wang (<wangg...@gmail.com>) > > escribió: > > > > > Yeah I'm only talking about the DSL part (i.e. how stateful / stateless > > > operators default inheritance protocol would be promised) to be managed > > > with KIP-159. > > > > > > For allowing users to override the default behavior in PAPI, that would > > be > > > in a different KIP. > > > > > > > > > Guozhang > > > > > > > > > On Fri, May 11, 2018 at 10:41 AM, Matthias J. Sax < > matth...@confluent.io > > > > > > wrote: > > > > > > > I am actually not sure about this. Because it's about the semantics > at > > > > PAPI level, but KIP-159 targets the DSL, it might actually be better > to > > > > have a separate KIP? > > > > > > > > -Matthias > > > > > > > > On 5/11/18 9:26 AM, Guozhang Wang wrote: > > > > > That's a good question. I think we can manage this in KIP-159. I > will > > > go > > > > > ahead and try to augment that KIP together with the original author > > > > Jeyhun. > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > On Fri, May 11, 2018 at 12:45 AM, Jorge Esteban Quilcate Otoya < > > > > > quilcate.jo...@gmail.com> wrote: > > > > > > > > > >> Thanks Guozhang and Matthias! I do also agree with this way of > > > handling > > > > >> headers inheritance. I will add them to the KIP doc. > > > > >> > > > > >>> We can discuss about extending the current protocol and how to > > enable > > > > >> users > > > > >>> override those rule, and how to expose them in the DSL layer in a > > > > future > > > > >>> KIP. > > > > >> > > > > >> About this, should this be managed on KIP-159 or a new one? > > > > >> > > > > >> El jue., 10 may. 2018 a las 17:46, Matthias J. Sax (< > > > > matth...@confluent.io > > > > >>> ) > > > > >> escribió: > > > > >> > > > > >>> Thanks Guozhang! Sounds good to me! > > > > >>> > > > > >>> -Matthias > > > > >>> > > > > >>> On 5/10/18 7:55 AM, Guozhang Wang wrote: > > > > >>>> Thanks for your thoughts Matthias. I think if we do want to > bring > > > > >> KIP-244 > > > > >>>> into 2.0 then we need to keep its scope small and well defined. > > For > > > > >> that > > > > >>>> I'm proposing: > > > > >>>> > > > > >>>> 1. Make the inheritance implementation of headers consistent > with > > > what > > > > >> we > > > > >>>> had with other record context fields. I.e. pass through the > record > > > > >>> context > > > > >>>> in `context.forward()`. Note that within a processor node, users > > can > > > > >>>> already manipulate the Headers with the given APIs, so at the > time > > > of > > > > >>>> forwarding, the library can just copy what-ever is left / > updated > > to > > > > >> the > > > > >>>> next processor node. > > > > >>>> > > > > >>>> 2. In the sink node, where a record is being sent to the Kafka > > > topic, > > > > >> we > > > > >>>> should consider the following: > > > > >>>> > > > > >>>> a. For sink topics, we will set the headers into the producer > > > record. > > > > >>>> b. For repartition topics, we will the headers into the producer > > > > >> record. > > > > >>>> c. For changelog topics, we will drop the headers in the produce > > > > record > > > > >>>> since they will not be used in restoration and not stored in the > > > state > > > > >>>> store either. > > > > >>>> > > > > >>>> > > > > >>>> We can discuss about extending the current protocol and how to > > > enable > > > > >>> users > > > > >>>> override those rule, and how to expose them in the DSL layer in > a > > > > >> future > > > > >>>> KIP. > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> Guozhang > > > > >>>> > > > > >>>> > > > > >>>> On Mon, May 7, 2018 at 5:49 PM, Matthias J. Sax < > > > > matth...@confluent.io > > > > >>> > > > > >>>> wrote: > > > > >>>> > > > > >>>>> Guozhang, > > > > >>>>> > > > > >>>>> if you advocate to forward headers by default, it might be a > > better > > > > >>>>> default strategy do forward the headers for all operators > > (similar > > > to > > > > >>>>> topic/partition/offset metadata). It's usually harder for users > > to > > > > >>>>> reason about different cases and thus I would prefer to have > > > > >> consistent > > > > >>>>> behavior, ie, only one default strategy instead of introducing > > > > >> different > > > > >>>>> cases. > > > > >>>>> > > > > >>>>> Btw: My argument about dropping headers by default only > implies, > > > that > > > > >>>>> users need to copy the headers explicitly to the output records > > in > > > > >> there > > > > >>>>> code of they want to inspect them later -- it does not imply > that > > > > >>>>> headers cannot be forwarded downstream. (Not sure if this was > > > clear). > > > > >>>>> > > > > >>>>> I am also ok with copying be default thought (for me, it's a > > 51/49 > > > > >>>>> preference for dropping by default only). > > > > >>>>> > > > > >>>>> > > > > >>>>> -Matthias > > > > >>>>> > > > > >>>>> On 5/7/18 4:52 PM, Guozhang Wang wrote: > > > > >>>>>> Hi Matthias, > > > > >>>>>> > > > > >>>>>> My concern of setting `null` in all cases is that it would > make > > > > >> headers > > > > >>>>> not > > > > >>>>>> very useful in KIP-244 then, because headers will only be > > > available > > > > >> at > > > > >>>>> the > > > > >>>>>> source stream / table, but not in any of the following > > instances. > > > In > > > > >>>>>> practice users may be more likely to look into the headers > later > > > in > > > > >> the > > > > >>>>>> pipeline. Personally I'd suggest we pass the headers for all > > > > >> stateless > > > > >>>>>> operators in DSL and everywhere in PAPI's context.forward(). > For > > > > >>>>>> repartition topics and sink topics, we also set them in the > > > produced > > > > >>>>>> records accordingly; for changelog topics, we do not set them > > > since > > > > >>> they > > > > >>>>>> are not going to be used anywhere in the store. > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> Guozhang > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> On Sun, May 6, 2018 at 9:03 PM, Matthias J. Sax < > > > > >> matth...@confluent.io > > > > >>>> > > > > >>>>>> wrote: > > > > >>>>>> > > > > >>>>>>> I agree, that we should not block this KIP if possible. > > > > >> Nevertheless, > > > > >>> we > > > > >>>>>>> should try to get a reasonable default strategy for > inheriting > > > the > > > > >>>>>>> headers so we don't need to change it later on. > > > > >>>>>>> > > > > >>>>>>> Let's see what other think. I still tend slightly to set to > > > `null` > > > > >> by > > > > >>>>>>> default for all cases. If the default strategy is different > for > > > > >>>>>>> different operators as you suggest, it might be confusion to > > > users. > > > > >>>>>>> IMHO, the default behavior should be as simple as possible. > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> -Matthias > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> On 5/6/18 8:53 PM, Guozhang Wang wrote: > > > > >>>>>>>> Matthias, thanks for sharing your opinions in the > inheritance > > > > >>> protocol > > > > >>>>> of > > > > >>>>>>>> the record context. I'm thinking maybe we should make this > > > > >> discussion > > > > >>>>> as > > > > >>>>>>> a > > > > >>>>>>>> separate KIP by itself? If yes, then KIP-244's scope would > be > > > > >>> smaller, > > > > >>>>>>> and > > > > >>>>>>>> within KIP-244 we can have a simple inheritance rule that > > > setting > > > > >> it > > > > >>> to > > > > >>>>>>>> null when 1) going through stateful operators and 2) sending > > to > > > > any > > > > >>>>>>> topics. > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> Guozhang > > > > >>>>>>>> > > > > >>>>>>>> On Sun, May 6, 2018 at 10:24 AM, Matthias J. Sax < > > > > >>>>> matth...@confluent.io> > > > > >>>>>>>> wrote: > > > > >>>>>>>> > > > > >>>>>>>>> Making the inheritance protocol a public contract seems > > > > reasonable > > > > >>> to > > > > >>>>>>> me. > > > > >>>>>>>>> > > > > >>>>>>>>> In the current implementation, all output records inherits > > the > > > > >>> offset, > > > > >>>>>>>>> timestamp, topic, and partition metadata from the input > > record. > > > > We > > > > >>>>>>>>> already added an API to change the timestamp explicitly for > > the > > > > >>> output > > > > >>>>>>>>> record thought. > > > > >>>>>>>>> > > > > >>>>>>>>> I think it make sense to keep the inheritance of offset, > > topic, > > > > >> and > > > > >>>>>>>>> partition. For headers, it's worth to discuss. I see > > arguments > > > > for > > > > >>> two > > > > >>>>>>>>> strategies: (1) inherit by default, (2) set `null` by > > default. > > > > >>>>>>>>> Independent of the default behavior, we should add an API > to > > > set > > > > >>>>> headers > > > > >>>>>>>>> for output records explicitly though (similar to the "set > > > > >> timestamp > > > > >>>>>>> API"). > > > > >>>>>>>>> > > > > >>>>>>>>> From my point of view, timestamp/headers are a different > > > > >>>>>>>>> "class/category" of data/metadata than > > topic/partition/offset. > > > > For > > > > >>> the > > > > >>>>>>>>> first category, it makes sense to manipulate them and it's > > more > > > > >> than > > > > >>>>>>>>> "plain metadata"; especially the timestamp. For the second > > > > >> category > > > > >>> it > > > > >>>>>>>>> does not make sense to manipulate it, and to me > > > > >>> topic/partition/offset > > > > >>>>>>>>> is pure metadata only---strictly speaking, it's even > > > questionable > > > > >> if > > > > >>>>>>>>> output records should have any value for > > topic/partition/offset > > > > in > > > > >>> the > > > > >>>>>>>>> first place, or if they should be `null`, because those > > > > attributes > > > > >>> do > > > > >>>>>>>>> only make sense for source records that are consumed from a > > > topic > > > > >>>>>>>>> directly only. On the other hand, if we make this > difference > > > > >>> explicit, > > > > >>>>>>>>> it might be useful information for the use to track the > > current > > > > >>>>>>>>> topic/partition/offset of the original source record. > > > > >>>>>>>>> > > > > >>>>>>>>> Furthermore, to me, timestamps and headers are somewhat > > > > different, > > > > >>>>> too. > > > > >>>>>>>>> For stream processing it's required that every record has a > > > > >>> timestamp; > > > > >>>>>>>>> thus, it make sense to inherit the input record timestamp > by > > > > >> default > > > > >>>>> (a > > > > >>>>>>>>> timestamp is not really metadata but actually equally > > important > > > > to > > > > >>> key > > > > >>>>>>>>> and value from my point of view). Header however are > > optional, > > > > and > > > > >>>>> thus > > > > >>>>>>>>> inheriting them is not really required. It might be > > convenient > > > > >>> though: > > > > >>>>>>>>> for example, imagine a simple "filter-only" application -- > it > > > > >> would > > > > >>> be > > > > >>>>>>>>> cumbersome for users to explicitly copy the headers from > the > > > > input > > > > >>>>>>>>> records to the output records -- it seems to be unnecessary > > > > >>>>> boilerplate > > > > >>>>>>>>> code. On the other hand, for any other more complex use > case, > > > > it's > > > > >>>>>>>>> questionable to inherit headers---note, that headers would > be > > > > >>> written > > > > >>>>> to > > > > >>>>>>>>> the output topics increasing the size of the messages. > > > Overall, I > > > > >> am > > > > >>>>> not > > > > >>>>>>>>> sure which default strategy might be the better one for > > > headers. > > > > >> Is > > > > >>>>>>>>> there a convincing argument for either one of them? I > > slightly > > > > >> tend > > > > >>> to > > > > >>>>>>>>> think that using `null` as default might be better. > > > > >>>>>>>>> > > > > >>>>>>>>> Last, we could also make the default behavior configurable. > > > > >>> Something > > > > >>>>>>>>> like `inherit.record.headers=true/false` with default value > > > > >> "false". > > > > >>>>>>>>> This would allow people to opt-in for > > auto-header-inheritance. > > > > >> Just > > > > >>> an > > > > >>>>>>>>> idea I wanted to add to the discussion---not sure if it's a > > > good > > > > >>> one. > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> -Matthias > > > > >>>>>>>>> > > > > >>>>>>>>> On 5/4/18 3:13 PM, Guozhang Wang wrote: > > > > >>>>>>>>>> Hello Jorge, > > > > >>>>>>>>>> > > > > >>>>>>>>>>> Agree. Probably point 3 handles this. `Headers` been part > > of > > > > >>>>>>>>> `RecordContext` > > > > >>>>>>>>>> would be handled the same way as other attributes. > > > > >>>>>>>>>> > > > > >>>>>>>>>> Today we do not have a clear inheritance protocol for > other > > > > >> fields > > > > >>> of > > > > >>>>>>>>>> RecordContext yet: although internally we do have some > > > criterion > > > > >> on > > > > >>>>>>>>>> topic/partition/offset and timestamp, they are not > > explicitly > > > > >>> exposed > > > > >>>>>>> to > > > > >>>>>>>>>> users. > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> I think we still need to have a defined protocol for > headers > > > > >>> itself, > > > > >>>>>>> but > > > > >>>>>>>>> I > > > > >>>>>>>>>> agree that it better to be scoped out side of this KIP, > > since > > > > >> this > > > > >>>>>>>>>> inheritance protocol itself for all the fields of > > > RecordContext > > > > >>> would > > > > >>>>>>>>>> better be a separate KIP. We can document this clearly in > > the > > > > >> wiki > > > > >>>>>>> page. > > > > >>>>>>>>>> > > > > >>>>>>>>>> Guozhang > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> On Fri, May 4, 2018 at 5:26 AM, Florian Garcia < > > > > >>>>>>>>>> garcia.florian.pe...@gmail.com> wrote: > > > > >>>>>>>>>> > > > > >>>>>>>>>>> Hi, > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> For me this is a great first step to have Headers in > > > streaming. > > > > >>>>>>>>>>> My current use case is about distributed tracing (Zipkin) > > and > > > > >> with > > > > >>>>> the > > > > >>>>>>>>>>> headers in the processorContext() I'll be able to manage > > that > > > > >> for > > > > >>>>> the > > > > >>>>>>>>> most > > > > >>>>>>>>>>> cases. > > > > >>>>>>>>>>> The KIP-159 should follow after this but this is where > all > > > the > > > > >>> major > > > > >>>>>>>>>>> questions will arise for stateful operations (as Guozhang > > > > said). > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Thanks for the work on this Jorge. > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Le ven. 4 mai 2018 à 01:04, Jorge Esteban Quilcate Otoya > < > > > > >>>>>>>>>>> quilcate.jo...@gmail.com> a écrit : > > > > >>>>>>>>>>> > > > > >>>>>>>>>>>> Thanks Guozhang and John for your feedback. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> 1. We need to have a clear inheritance protocol of > > headers > > > in > > > > >>> our > > > > >>>>>>>>>>>> topology: > > > > >>>>>>>>>>>>> 1.a. In PAPI's context.forward() call, it should be > > > > >>>>>>> straight-forward. > > > > >>>>>>>>>>>>> 1.b. In DSL stateless operators, it should be > > > > >> straight-forward. > > > > >>>>>>>>>>>>> 1.c. What about in stateful operators like aggregates > and > > > > >> joins? > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Agree. Probably point 3 handles this. `Headers` been > part > > of > > > > >>>>>>>>>>>> `RecordContext` would be handled the same way as other > > > > >>> attributes. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> 3. In future work "Adding DSL Processors to use Headers > > to > > > > >>>>>>>>>>>> filter/map/branch", > > > > >>>>>>>>>>>> it may well be covered in KIP-159; worth taking a look > at > > > that > > > > >>> KIP. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Yes, I will point to it. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> 2. In terms of internal implementations, should the > state > > > > >> store > > > > >>>>>>>>>>>> cache include the headers then in order to be sent > > > > downstreams? > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Good question. As `LRUCacheEntry` extends > > `RecordContext`, I > > > > >>> thinks > > > > >>>>>>>>> this > > > > >>>>>>>>>>> is > > > > >>>>>>>>>>>> already supported. I will detail this on the KIP. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> 4. MINOR: "void process(K key, V value, Headers > > headers)", > > > > >> this > > > > >>>>>>> should > > > > >>>>>>>>>>> be > > > > >>>>>>>>>>>> removed? > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Fixed, thanks. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our > > > scope > > > > >> is > > > > >>>>>>> only > > > > >>>>>>>>>>>> for exposing > > > > >>>>>>>>>>>> the headers for reading, and not allowing users to add / > > > > modify > > > > >>>>>>>>> headers, > > > > >>>>>>>>>>>> right? If yes, we'd better state it clearly at the > > "Proposed > > > > >>>>> Changes" > > > > >>>>>>>>>>>> section. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> As headers is exposed in the `ProcessContext`, and > headers > > > > will > > > > >>> be > > > > >>>>>>> send > > > > >>>>>>>>>>>> downstream, it can be mutated (add/remove headers). > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > Also, despite the decreased scope in this KIP, I > think > > it > > > > >>> might > > > > >>>>> be > > > > >>>>>>>>>>>> valuable to define what will happen to headers once this > > > > change > > > > >>> is > > > > >>>>>>>>>>>> implemented. For example, I think a minimal > > groundwork-level > > > > >>> change > > > > >>>>>>>>> might > > > > >>>>>>>>>>>> be to make the API changes, while promising to drop all > > > > headers > > > > >>>>> from > > > > >>>>>>>>>>> input > > > > >>>>>>>>>>>> records. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> I will suggest to pass headers to downstream nodes, and > > > don't > > > > >>> drop > > > > >>>>>>>>> yhrm. > > > > >>>>>>>>>>>> Clients will have to drop `Headers` if they have used > > them. > > > > >>>>>>>>>>>> Or it could be something like a boolean config property > > that > > > > >>> manage > > > > >>>>>>>>> this. > > > > >>>>>>>>>>>> I would like to hear feedback here. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> A maximal groundwork change would be to forward the > > headers > > > > >>>>> through > > > > >>>>>>>>> all > > > > >>>>>>>>>>>> operators > > > > >>>>>>>>>>>> in > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Streams. But I think there are some unresolved questions > > > about > > > > >>>>>>>>>>> forwarding, > > > > >>>>>>>>>>>> like "what happens to the headers in a join?" > > > > >>>>>>>>>>>> Probably this would be solve once KIP-159 is implemented > > and > > > > >>>>>>> supporting > > > > >>>>>>>>>>>> Headers. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> There's of course some middle ground, but > instinctively, > > I > > > > >> think > > > > >>>>> I'd > > > > >>>>>>>>>>>> prefer to have a clear definition that headers are > > currently > > > > >>> *not* > > > > >>>>>>>>>>>> forwarded, rather than having a complex list of > operators > > > that > > > > >> do > > > > >>>>> or > > > > >>>>>>>>>>> don't > > > > >>>>>>>>>>>> forward them. Plus, I think it might be tricky to define > > > this > > > > >>>>>>> behavior > > > > >>>>>>>>>>>> while not allowing the scope to return to that of your > > > > original > > > > >>>>>>>>> proposal! > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Agree. But `Headers` were forwarded *explicitly* in the > > > > >> original > > > > >>>>>>>>>>> proposal. > > > > >>>>>>>>>>>> The current one pass it as part of `RecordContext`, so > if > > > it's > > > > >>>>>>> forward > > > > >>>>>>>>> it > > > > >>>>>>>>>>>> or not is as the same as `RecordContext`. > > > > >>>>>>>>>>>> On top of this implementation, we can design how > > > > >> filter/map/join > > > > >>>>> will > > > > >>>>>>>>> be > > > > >>>>>>>>>>>> handled. Probably following KIP-159 approach. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> Cheers, > > > > >>>>>>>>>>>> Jorge. > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> El mié., 2 may. 2018 a las 22:56, Guozhang Wang (< > > > > >>>>> wangg...@gmail.com > > > > >>>>>>>> ) > > > > >>>>>>>>>>>> escribió: > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> Hi Jorge, > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Thanks for the written KIP! Made a pass over it and > left > > > some > > > > >>>>>>> comments > > > > >>>>>>>>>>>>> (some of them overlapped with John's): > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 1. We need to have a clear inheritance protocol of > > headers > > > in > > > > >>> our > > > > >>>>>>>>>>>> topology: > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 1.a. In PAPI's context.forward() call, it should be > > > > >>>>>>> straight-forward. > > > > >>>>>>>>>>>>> 1.b. In DSL stateless operators, it should be > > > > >> straight-forward. > > > > >>>>>>>>>>>>> 1.c. What about in stateful operators like aggregates > and > > > > >> joins? > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 2. In terms of internal implementations, should the > state > > > > >> store > > > > >>>>>>> cache > > > > >>>>>>>>>>>>> include the headers then in order to be sent > downstreams? > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 3. In future work "Adding DSL Processors to use Headers > > to > > > > >>>>>>>>>>>>> filter/map/branch", it may well be covered in KIP-159; > > > worth > > > > >>>>> taking > > > > >>>>>>> a > > > > >>>>>>>>>>>> look > > > > >>>>>>>>>>>>> at that KIP. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 4. MINOR: "void process(K key, V value, Headers > > headers)", > > > > >> this > > > > >>>>>>> should > > > > >>>>>>>>>>> be > > > > >>>>>>>>>>>>> removed? > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our > > > scope > > > > >> is > > > > >>>>>>> only > > > > >>>>>>>>>>> for > > > > >>>>>>>>>>>>> exposing the headers for reading, and not allowing > users > > to > > > > >> add > > > > >>> / > > > > >>>>>>>>>>> modify > > > > >>>>>>>>>>>>> headers, right? If yes, we'd better state it clearly at > > the > > > > >>>>>>> "Proposed > > > > >>>>>>>>>>>>> Changes" section. > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Guozhang > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> On Wed, May 2, 2018 at 8:42 AM, John Roesler < > > > > >> j...@confluent.io > > > > >>>> > > > > >>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Hi Jorge, > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Thanks for the design work. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> I agree that de-scoping the work to just the Processor > > API > > > > >> will > > > > >>>>>>> help > > > > >>>>>>>>>>>>>> contain the design and implementation complexity. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> In the KIP, it mentions that the headers would be > > > available > > > > >> in > > > > >>>>> the > > > > >>>>>>>>>>>>>> ProcessorContext, (like "context.headers()"). It also > > says > > > > >> that > > > > >>>>>>>>>>>>>> implementers would need to implement the method "void > > > > >> process(K > > > > >>>>>>> key, > > > > >>>>>>>>>>> V > > > > >>>>>>>>>>>>>> value, Headers headers);". I think maybe you meant to > > > remove > > > > >>> the > > > > >>>>>>>>>>>> proposal > > > > >>>>>>>>>>>>>> to modify "process", since it wouldn't be necessary in > > > > >>>>> conjunction > > > > >>>>>>>>>>> with > > > > >>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>> ProcessorContext change, and it's not represented in > > your > > > > PR. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Also, despite the decreased scope in this KIP, I think > > it > > > > >> might > > > > >>>>> be > > > > >>>>>>>>>>>>> valuable > > > > >>>>>>>>>>>>>> to define what will happen to headers once this change > > is > > > > >>>>>>>>>>> implemented. > > > > >>>>>>>>>>>>> For > > > > >>>>>>>>>>>>>> example, I think a minimal groundwork-level change > might > > > be > > > > >> to > > > > >>>>> make > > > > >>>>>>>>>>> the > > > > >>>>>>>>>>>>> API > > > > >>>>>>>>>>>>>> changes, while promising to drop all headers from > input > > > > >>> records. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> A maximal groundwork change would be to forward the > > > headers > > > > >>>>> through > > > > >>>>>>>>>>> all > > > > >>>>>>>>>>>>>> operators in Streams. But I think there are some > > > unresolved > > > > >>>>>>> questions > > > > >>>>>>>>>>>>> about > > > > >>>>>>>>>>>>>> forwarding, like "what happens to the headers in a > > join?" > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> There's of course some middle ground, but > > instinctively, I > > > > >>> think > > > > >>>>>>> I'd > > > > >>>>>>>>>>>>> prefer > > > > >>>>>>>>>>>>>> to have a clear definition that headers are currently > > > *not* > > > > >>>>>>>>>>> forwarded, > > > > >>>>>>>>>>>>>> rather than having a complex list of operators that do > > or > > > > >> don't > > > > >>>>>>>>>>> forward > > > > >>>>>>>>>>>>>> them. Plus, I think it might be tricky to define this > > > > >> behavior > > > > >>>>>>> while > > > > >>>>>>>>>>>> not > > > > >>>>>>>>>>>>>> allowing the scope to return to that of your original > > > > >> proposal! > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Thanks again for the KIP, > > > > >>>>>>>>>>>>>> -John > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate > > > Otoya > > > > >> < > > > > >>>>>>>>>>>>>> quilcate.jo...@gmail.com> wrote: > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Hi Matthias, > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> I've created a new JIRA to track this, updated the > KIP > > > and > > > > >>>>> create > > > > >>>>>>> a > > > > >>>>>>>>>>>> PR. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Looking forward to your feedback, > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Jorge. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (< > > > > >>>>>>>>>>>>>> matth...@confluent.io > > > > >>>>>>>>>>>>>>>> ) > > > > >>>>>>>>>>>>>>> escribió: > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Hi Jorge, > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> I would like to unblock this KIP to make some > > progress. > > > > The > > > > >>>>>>>>>>> tricky > > > > >>>>>>>>>>>>>>>> question of this work, seems to be how to expose > > headers > > > > at > > > > >>> DSL > > > > >>>>>>>>>>>>> level. > > > > >>>>>>>>>>>>>>>> This related to KIP-149 and KIP-159. However, for > > > > Processor > > > > >>>>> API, > > > > >>>>>>>>>>> it > > > > >>>>>>>>>>>>>>>> seems to be rather straight forward to add headers > to > > > the > > > > >>> API. > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Thus, I would suggest to de-scope this KIP and add > > > header > > > > >>>>> support > > > > >>>>>>>>>>>> for > > > > >>>>>>>>>>>>>>>> Processor API only as a first step. If this is done, > > we > > > > can > > > > >>> see > > > > >>>>>>>>>>> in > > > > >>>>>>>>>>>> a > > > > >>>>>>>>>>>>>>>> second step, how to add headers at DSL level. > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> WDYT about this proposal? > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> If you agree, please update the JIRA and KIP > > > accordingly. > > > > >>> Note, > > > > >>>>>>>>>>>> that > > > > >>>>>>>>>>>>> we > > > > >>>>>>>>>>>>>>>> have two JIRA that are duplicates atm. We can scope > > them > > > > >>>>>>>>>>>> accordingly: > > > > >>>>>>>>>>>>>>>> one for PAPI only, and second as a dependent JIRA > for > > > DSL. > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> -Matthias > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya > > wrote: > > > > >>>>>>>>>>>>>>>>> Thanks for your feedback! > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> 1. I was adding headers to KeyValue to support > > groupBy, > > > > >> but > > > > >>> I > > > > >>>>>>>>>>>> think > > > > >>>>>>>>>>>>>> it > > > > >>>>>>>>>>>>>>> is > > > > >>>>>>>>>>>>>>>>> not necessary. It should be enough with mapping > > headers > > > > to > > > > >>>>>>>>>>>>> key/value > > > > >>>>>>>>>>>>>>> and > > > > >>>>>>>>>>>>>>>>> then group using current KeyValue structure. > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> 2. Yes. IMO key/value stores, like RocksDB, rely on > > KV > > > as > > > > >>>>>>>>>>>>> structure, > > > > >>>>>>>>>>>>>>>> hence > > > > >>>>>>>>>>>>>>>>> considering headers as part of stateful operations > > will > > > > >> not > > > > >>>>> fit > > > > >>>>>>>>>>>> in > > > > >>>>>>>>>>>>>> this > > > > >>>>>>>>>>>>>>>>> approach and increase complexity (I cannot think > in a > > > > >>> use-case > > > > >>>>>>>>>>>> that > > > > >>>>>>>>>>>>>>> need > > > > >>>>>>>>>>>>>>>>> this). > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> 3. and 4. Changes on 1. will solve this issue. > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> Probably I rush a bit proposing this change, I was > > not > > > > >> aware > > > > >>>>> of > > > > >>>>>>>>>>>>>> KIP-159 > > > > >>>>>>>>>>>>>>>> or > > > > >>>>>>>>>>>>>>>>> KAFKA-5632. > > > > >>>>>>>>>>>>>>>>> If KIP-159 is adopted and we reduce this KIP to add > > > > >> Headers > > > > >>> to > > > > >>>>>>>>>>>>>>>>> RecordContext will be enough, but I'm not sure > about > > > the > > > > >>> scope > > > > >>>>>>>>>>> of > > > > >>>>>>>>>>>>>>>> KIP-159. > > > > >>>>>>>>>>>>>>>>> If it includes stateful operations will be > difficult > > to > > > > >>>>>>>>>>>> implemented > > > > >>>>>>>>>>>>>> as > > > > >>>>>>>>>>>>>>>>> stated in 2. > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> Cheers, > > > > >>>>>>>>>>>>>>>>> Jorge. > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax > (< > > > > >>>>>>>>>>>>>>>> matth...@confluent.io>) > > > > >>>>>>>>>>>>>>>>> escribió: > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> Thanks for the KIP Jorge, > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> As Bill pointed out already, we should be careful > > with > > > > >>> adding > > > > >>>>>>>>>>>> new > > > > >>>>>>>>>>>>>>>>>> overloads as this contradicts the work done via > > > KIP-182. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> This KIP also seems to be related to KIP-149 and > > > > KIP-159. > > > > >>> Are > > > > >>>>>>>>>>>> you > > > > >>>>>>>>>>>>>>> aware > > > > >>>>>>>>>>>>>>>>>> of them? Both have quite long DISCUSS threads, but > > it > > > > >> might > > > > >>>>> be > > > > >>>>>>>>>>>>> worth > > > > >>>>>>>>>>>>>>>>>> browsing through them. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> A few further questions: > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> - why do you want to add the headers to > > `KeyValue`? I > > > > am > > > > >>> not > > > > >>>>>>>>>>>> sure > > > > >>>>>>>>>>>>>> if > > > > >>>>>>>>>>>>>>> we > > > > >>>>>>>>>>>>>>>>>> should consider headers as optional metadata and > add > > > it > > > > >> to > > > > >>>>>>>>>>>>>>>>>> `RecordContext` similar to timestamp, offset, etc. > > > only > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> - You only include stateless single-record > > > > >> transformations > > > > >>>>> at > > > > >>>>>>>>>>>> the > > > > >>>>>>>>>>>>>> DSL > > > > >>>>>>>>>>>>>>>>>> level. Do you suggest that all other operator just > > > drop > > > > >>>>>>>>>>> headers > > > > >>>>>>>>>>>> on > > > > >>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>> floor? > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> - Why do you only want to put headers into > > in-memory > > > > and > > > > >>>>>>>>>>> cache > > > > >>>>>>>>>>>>> but > > > > >>>>>>>>>>>>>>> not > > > > >>>>>>>>>>>>>>>>>> RocksDB store? What do you mean by "pass through"? > > > IMHO, > > > > >>> all > > > > >>>>>>>>>>>>> stores > > > > >>>>>>>>>>>>>>>>>> should behave the same at DSL level. > > > > >>>>>>>>>>>>>>>>>> -> if we store the headers in the state stores, > > > what > > > > >> is > > > > >>>>> the > > > > >>>>>>>>>>>>>> upgrade > > > > >>>>>>>>>>>>>>>>>> path? > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> - Why do we need to store record header in state > in > > > the > > > > >>>>> first > > > > >>>>>>>>>>>>>> place, > > > > >>>>>>>>>>>>>>> if > > > > >>>>>>>>>>>>>>>>>> we exclude stateful operator at DSL level? > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> What is the motivation for the "border lines" you > > > > choose? > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> -Matthias > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> On 12/21/17 8:18 AM, Bill Bejeck wrote: > > > > >>>>>>>>>>>>>>>>>>> Jorge, > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> Thanks for the KIP, I know this is a feature > others > > > in > > > > >> the > > > > >>>>>>>>>>>>>> community > > > > >>>>>>>>>>>>>>>> have > > > > >>>>>>>>>>>>>>>>>>> been interested in getting into Kafka Streams. > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> I took a quick pass over it, and I have one > initial > > > > >>>>> question. > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> We recently reduced overloads with KIP-182, and > in > > > this > > > > >>> KIP > > > > >>>>>>>>>>> we > > > > >>>>>>>>>>>>> are > > > > >>>>>>>>>>>>>>>>>>> increasing them again. > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> I can see from the KIP why they are necessary, > but > > > I'm > > > > >>>>>>>>>>>> wondering > > > > >>>>>>>>>>>>> if > > > > >>>>>>>>>>>>>>>> there > > > > >>>>>>>>>>>>>>>>>>> is something else we can do to cut down on the > > > > overloads > > > > >>>>>>>>>>>>>>> introduced. I > > > > >>>>>>>>>>>>>>>>>>> don't have any sound suggestions ATM, so I'll > have > > to > > > > >>> think > > > > >>>>>>>>>>>> about > > > > >>>>>>>>>>>>>> it > > > > >>>>>>>>>>>>>>>> some > > > > >>>>>>>>>>>>>>>>>>> more, but I wanted to put the thought out there. > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> Thanks, > > > > >>>>>>>>>>>>>>>>>>> Bill > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban > > > Quilcate > > > > >>>>>>>>>>> Otoya < > > > > >>>>>>>>>>>>>>>>>>> quilcate.jo...@gmail.com> wrote: > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> Hi all, > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> I have created a KIP to add Record Headers > support > > > to > > > > >>> Kafka > > > > >>>>>>>>>>>>>> Streams > > > > >>>>>>>>>>>>>>>> API: > > > > >>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/ > > > > confluence/display/KAFKA/KIP- > > > > >>>>>>>>>>>>>>>>>>>> > 244%3A+Add+Record+Header+support+to+Kafka+Streams > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> The main goal is to be able to use headers to > > > filter, > > > > >> map > > > > >>>>>>>>>>> and > > > > >>>>>>>>>>>>>>> process > > > > >>>>>>>>>>>>>>>>>>>> records as streams. Stateful processing (joins, > > > > >> windows) > > > > >>>>> are > > > > >>>>>>>>>>>> not > > > > >>>>>>>>>>>>>>>>>>>> considered. > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> Proposed changes/Draft: > > > > >>>>>>>>>>>>>>>>>>>> > > > https://github.com/apache/kafka/compare/trunk...jeqo: > > > > >>>>>>>>>>>>>>> streams-headers > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> Feedback and suggestions are more than welcome. > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> Cheers, > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> Jorge. > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> -- > > > > >>>>>>>>>>>>> -- Guozhang > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>> > > > > >>>>> > > > > >>>> > > > > >>>> > > > > >>> > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >