Hi Dev team, I'd like to revamp the KIP again: https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
- Reference implementation is now using the latest `Processor` API from KIP-478: https://github.com/apache/kafka/pull/10265/files for both Processors backing changes on the KStream API. - It is proposing to still extend `To` class for backwards compatibility. Looking forward to your feedback. Regards, Jorge. On Thu, 4 Mar 2021 at 18:38, Jorge Esteban Quilcate Otoya < quilcate.jo...@gmail.com> wrote: > Hi everyone! > > I'd like to revamp this KIP. I have made some significant changes on the > scope: > - Added `mapRecordValue` to map not only headers, but other record > metadata: topic name, partition, offset, and timestamp into a new type > `RecordValue<V>`. > - Added a serde for `RecordValue` to support stateful operations. > - Added `setRecordHeaders` to apply headers to record crossing the stream. > - Added headers to `To` to update headers via `context.forward(k, v, to)`. > > New link: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL > > Looking forward to your feedback, > > Cheers and stay safe, > Jorge. > > On Thu, Oct 29, 2020 at 12:33 AM Jorge Esteban Quilcate Otoya < > quilcate.jo...@gmail.com> wrote: > >> Thanks Sophie! Haven't followed KIP-478 but sounds great. >> I'll be happy to help on that migration to the new PAPI if it's still an >> open issue. We can bump this KIP after that. >> >> Cheers, >> Jorge. >> >> On Wed, Oct 28, 2020 at 7:00 PM Sophie Blee-Goldman <sop...@confluent.io> >> wrote: >> >>> I *think* that the `To` Matthias was referring to was not KStream#to but >>> the To class >>> which is accepted as a possible parameter of ProcessorContext#forward >>> (correct >>> me if wrong). >>> >>> This was on the old ProcessorContext interface, which has now been >>> replaced with >>> the new api.ProcessorContext in KIP-478. In the new interface we've moved >>> away >>> from the forward signatures that accept a separate key or value or >>> timestamp or To, >>> and wrapped all of these into a single Record class. This new Record >>> class >>> has the >>> headers as a field, so it seems like KIP-478 has happened to solve the >>> lack >>> of support >>> for Headers in the PAPI along the way. >>> >>> This is all somewhat recent, and probably wasn't yet sorted out at the >>> time >>> of Matthias' >>> last reply. But given how this worked out it seems like we can just focus >>> on adding >>> support for Headers in the DSL in this KIP by building off of the >>> groundwork of >>> KIP-478? It doesn't seem necessary to go back and add support for headers >>> in the old >>> PAPI, since this will (or already has?) been deprecated. >>> >>> The one challenge is that this will presumably require that we migrate >>> all >>> DSL operators >>> to the new PAPI before adding header support for those operators. But >>> that >>> definitely >>> sounds achievable here >>> >>> On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya < >>> quilcate.jo...@gmail.com> wrote: >>> >>> > Hi Matthias, >>> > >>> > Sorry for the late reply. >>> > >>> > I like the proposal. Just to check if I got it right: >>> > >>> > We can extend the `kstream.to()` function to support setting headers. >>> > e.g.: >>> > >>> > ``` >>> > void to(final String topic, >>> > final Produced<K, V> produced, >>> > final HeadersExtractor<K, V> headersExtractor); >>> > ``` >>> > >>> > where `HeadersExtractor`: >>> > >>> > ``` >>> > public interface HeadersExtractor<K, V> { >>> > Headers extract(final K key, final V value, final RecordContext >>> > recordContext); >>> > } >>> > ``` >>> > >>> > This would require to change `Topology#addSink()` to support this >>> > extractor as well. >>> > >>> > If this is aligned with your proposal, I'm happy to add it to this KIP. >>> > >>> > Cheers, >>> > Jorge. >>> > >>> > On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <mj...@apache.org> >>> wrote: >>> > >>> > > Jorge, >>> > > >>> > > thanks a lot for this KIP. Being able to modify headers is a very >>> > > valuable feature. >>> > > >>> > > However, before we actually expose them in the DSL, I am wondering >>> if we >>> > > should improve how headers can be modified in the PAPI? Currently, >>> it is >>> > > possible but very clumsy to work with headers in the Processor API, >>> > > because of two reasons: >>> > > >>> > > (1) There is no default `Headers` implementation in the public API >>> > > (2) There is no explicit way to set headers for output records >>> > > >>> > > Currently, the input record headers are copied into the output >>> records >>> > > when `forward()` is called, however, it's not really a deep copy but >>> we >>> > > just copy the reference. This implies that one needs to work with a >>> > > single mutable object that flows through multiple processors making >>> it >>> > > very error prone. >>> > > >>> > > Furthermore, if you want to emit multiple output records, and for >>> > > example want to add two different headers to the output record >>> (based on >>> > > the same input headers), you would need to do something like this: >>> > > >>> > > Headers h = context.headers(); >>> > > h.add(...); >>> > > context.forward(...); >>> > > // remove the header you added for the first output record >>> > > h.remove(...); >>> > > h.add(...); >>> > > context.forward(...); >>> > > >>> > > >>> > > Maybe we could extend `To` to allow passing in a new `Headers` object >>> > > (or an `Iterable<Header>` similar to `ProducerRecord`)? We could >>> either >>> > > add it to your KIP or do a new KIP just for the PAPI. >>> > > >>> > > Thoughts? >>> > > >>> > > >>> > > -Matthias >>> > > >>> > > On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote: >>> > > > Hi everyone, >>> > > > >>> > > > Bumping this thread to check if there's any feedback. >>> > > > >>> > > > Cheers, >>> > > > Jorge. >>> > > > >>> > > > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya < >>> > > > quilcate.jo...@gmail.com> wrote: >>> > > > >>> > > >> Hi everyone, >>> > > >> >>> > > >> I would like to start the discussion for KIP-634: >>> > > >>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL >>> > > >> >>> > > >> Looking forward to your feedback. >>> > > >> >>> > > >> Thanks! >>> > > >> Jorge. >>> > > >> >>> > > >> >>> > > >> >>> > > >> >>> > > > >>> > > >>> > > >>> > >>> >>