[
https://issues.apache.org/jira/browse/KAFKA-7718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17596889#comment-17596889
]
Clive Cox commented on KAFKA-7718:
----------------------------------
Any update on this feature?
> Allow customized header inheritance for stateful operators in DSL
> -----------------------------------------------------------------
>
> Key: KAFKA-7718
> URL: https://issues.apache.org/jira/browse/KAFKA-7718
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Guozhang Wang
> Priority: Major
> Labels: needs-kip
>
> As a follow-up work of
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API,
> we want to provide allow users to customize how record headers are inherited
> while traversing the topology at the DSL layer (at the lower-level Processor
> API layer, users are already capable for customizing and inheriting the
> headers as they forward the records to next processor nodes).
> Today the headers are implicitly inherited throughout the topology without
> any modifications within the Streams library. For stateless operators
> (filter, map, etc) this default inheritance policy should be sufficient. For
> stateful operators where multiple input records may be generating a single
> record (i.e. it is an n:1 transformations rather than 1:1 mapping), since we
> only inherit from the triggering record, which would seem to be a "random"
> choice to the users and other records' headers are lost.
> I'd propose we extend DSL to allow users to customize the headers inheritance
> policy for stateful operators, namely Joins and Aggregations. It would
> contain two parts:
> 1) On the DSL layer, I'd suggest we extend `Joined` and `Grouped` control
> object with an additional function that allows users to pass in a lambda
> function (let's say its called HeadersMerger, but name subject to discuss
> over KIP) that takes two Headers object and generated a single Headers object
> in the return value.
> 2) On the implementation layer, we need to actually store the headers at the
> materialized state store so that they can be retrieved along with the record
> for join / aggregation processor. This would be changing the state store
> value bytes organization and hence better be considered carefully. Then when
> join / aggregate processor is triggered, the Headers of both records will be
> retrieved (one from the triggering record, one read from the materialized
> state store) and then passed to the HeadersMerger. Some low-hanging
> optimizations can be considered though, e.g. if users do not have overridden
> this interface, then we can consider not reading the headers from the other
> side at all to save IO cost.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)