Hi Dev team,

I'd like to revamp the KIP again:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL

- Reference implementation is now using the latest `Processor` API from
KIP-478: https://github.com/apache/kafka/pull/10265/files for both
Processors backing changes on the KStream API.
- It is proposing to still extend `To` class for backwards compatibility.

Looking forward to your feedback.

Regards,
Jorge.

On Thu, 4 Mar 2021 at 18:38, Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Hi everyone!
>
> I'd like to revamp this KIP. I have made some significant changes on the
> scope:
> - Added `mapRecordValue` to map not only headers, but other record
> metadata: topic name, partition, offset, and timestamp into a new type
> `RecordValue<V>`.
> - Added a serde for `RecordValue` to support stateful operations.
> - Added `setRecordHeaders` to apply headers to record crossing the stream.
> - Added headers to `To` to update headers via `context.forward(k, v, to)`.
>
> New link:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
>
> Looking forward to your feedback,
>
> Cheers and stay safe,
> Jorge.
>
> On Thu, Oct 29, 2020 at 12:33 AM Jorge Esteban Quilcate Otoya <
> quilcate.jo...@gmail.com> wrote:
>
>> Thanks Sophie! Haven't followed KIP-478 but sounds great.
>> I'll be happy to help on that migration to the new PAPI if it's still an
>> open issue. We can bump this KIP after that.
>>
>> Cheers,
>> Jorge.
>>
>> On Wed, Oct 28, 2020 at 7:00 PM Sophie Blee-Goldman <sop...@confluent.io>
>> wrote:
>>
>>> I *think* that the `To` Matthias was referring to was not KStream#to but
>>> the To class
>>> which is accepted as a possible parameter of ProcessorContext#forward
>>> (correct
>>> me if wrong).
>>>
>>> This was on the old ProcessorContext interface, which has now been
>>> replaced with
>>> the new api.ProcessorContext in KIP-478. In the new interface we've moved
>>> away
>>> from the forward signatures that accept a separate key or value or
>>> timestamp or To,
>>> and wrapped all of these into a single Record class. This new Record
>>> class
>>> has the
>>> headers as a field, so it seems like KIP-478 has happened to solve the
>>> lack
>>> of support
>>> for Headers in the PAPI along the way.
>>>
>>> This is all somewhat recent, and probably wasn't yet sorted out at the
>>> time
>>> of Matthias'
>>> last reply. But given how this worked out it seems like we can just focus
>>> on adding
>>> support for Headers in the DSL in this KIP by building off of the
>>> groundwork of
>>> KIP-478? It doesn't seem necessary to go back and add support for headers
>>> in the old
>>> PAPI, since this will (or already has?) been deprecated.
>>>
>>> The one challenge is that this will presumably require that we migrate
>>> all
>>> DSL operators
>>> to the new PAPI before adding header support for those operators. But
>>> that
>>> definitely
>>> sounds achievable here
>>>
>>> On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya <
>>> quilcate.jo...@gmail.com> wrote:
>>>
>>> > Hi Matthias,
>>> >
>>> > Sorry for the late reply.
>>> >
>>> > I like the proposal. Just to check if I got it right:
>>> >
>>> > We can extend the `kstream.to()` function to support setting headers.
>>> > e.g.:
>>> >
>>> > ```
>>> >     void to(final String topic,
>>> >             final Produced<K, V> produced,
>>> >             final HeadersExtractor<K, V> headersExtractor);
>>> > ```
>>> >
>>> > where `HeadersExtractor`:
>>> >
>>> > ```
>>> > public interface HeadersExtractor<K, V> {
>>> >     Headers extract(final K key, final V value, final RecordContext
>>> > recordContext);
>>> > }
>>> > ```
>>> >
>>> >  This would require to change `Topology#addSink()` to support this
>>> > extractor as well.
>>> >
>>> > If this is aligned with your proposal, I'm happy to add it to this KIP.
>>> >
>>> > Cheers,
>>> > Jorge.
>>> >
>>> > On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <mj...@apache.org>
>>> wrote:
>>> >
>>> > > Jorge,
>>> > >
>>> > > thanks a lot for this KIP. Being able to modify headers is a very
>>> > > valuable feature.
>>> > >
>>> > > However, before we actually expose them in the DSL, I am wondering
>>> if we
>>> > > should improve how headers can be modified in the PAPI? Currently,
>>> it is
>>> > > possible but very clumsy to work with headers in the Processor API,
>>> > > because of two reasons:
>>> > >
>>> > >  (1) There is no default `Headers` implementation in the public API
>>> > >  (2) There is no explicit way to set headers for output records
>>> > >
>>> > > Currently, the input record headers are copied into the output
>>> records
>>> > > when `forward()` is called, however, it's not really a deep copy but
>>> we
>>> > > just copy the reference. This implies that one needs to work with a
>>> > > single mutable object that flows through multiple processors making
>>> it
>>> > > very error prone.
>>> > >
>>> > > Furthermore, if you want to emit multiple output records, and for
>>> > > example want to add two different headers to the output record
>>> (based on
>>> > > the same input headers), you would need to do something like this:
>>> > >
>>> > >   Headers h = context.headers();
>>> > >   h.add(...);
>>> > >   context.forward(...);
>>> > >   // remove the header you added for the first output record
>>> > >   h.remove(...);
>>> > >   h.add(...);
>>> > >   context.forward(...);
>>> > >
>>> > >
>>> > > Maybe we could extend `To` to allow passing in a new `Headers` object
>>> > > (or an `Iterable<Header>` similar to `ProducerRecord`)? We could
>>> either
>>> > > add it to your KIP or do a new KIP just for the PAPI.
>>> > >
>>> > > Thoughts?
>>> > >
>>> > >
>>> > > -Matthias
>>> > >
>>> > > On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
>>> > > > Hi everyone,
>>> > > >
>>> > > > Bumping this thread to check if there's any feedback.
>>> > > >
>>> > > > Cheers,
>>> > > > Jorge.
>>> > > >
>>> > > > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya <
>>> > > > quilcate.jo...@gmail.com> wrote:
>>> > > >
>>> > > >> Hi everyone,
>>> > > >>
>>> > > >> I would like to start the discussion for KIP-634:
>>> > >
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
>>> > > >>
>>> > > >> Looking forward to your feedback.
>>> > > >>
>>> > > >> Thanks!
>>> > > >> Jorge.
>>> > > >>
>>> > > >>
>>> > > >>
>>> > > >>
>>> > > >
>>> > >
>>> > >
>>> >
>>>
>>

Reply via email to