Guozhang Wang created KAFKA-7718:
------------------------------------

             Summary: Allow customized header inheritance for stateful 
operators in DSL
                 Key: KAFKA-7718
                 URL: https://issues.apache.org/jira/browse/KAFKA-7718
             Project: Kafka
          Issue Type: Improvement
          Components: streams
            Reporter: Guozhang Wang


As a follow-up work of 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API,
 we want to provide allow users to customize how record headers are inherited 
while traversing the topology at the DSL layer (at the lower-level Processor 
API layer, users are already capable for customizing and inheriting the headers 
as they forward the records to next processor nodes).

Today the headers are implicitly inherited throughout the topology without any 
modifications within the Streams library. For stateless operators (filter, map, 
etc) this default inheritance policy should be sufficient. For stateful 
operators where multiple input records may be generating a single record (i.e. 
it is an n:1 transformations rather than 1:1 mapping), since we only inherit 
from the triggering record, which would seem to be a "random" choice to the 
users and other records' headers are lost.

I'd propose we extend DSL to allow users to customize the headers inheritance 
policy for stateful operators, namely Joins and Aggregations. It would contain 
two parts:

1) On the DSL layer, I'd suggest we extend `Joined` and `Grouped` control 
object with an additional function that allows users to pass in a lambda 
function (let's say its called HeadersMerger, but name subject to discuss over 
KIP) that takes two Headers object and generated a single Headers object in the 
return value.

2) On the implementation layer, we need to actually store the headers at the 
materialized state store so that they can be retrieved along with the record 
for join / aggregation processor. This would be changing the state store value 
bytes organization and hence better be considered carefully. Then when join / 
aggregate processor is triggered, the Headers of both records will be retrieved 
(one from the triggering record, one read from the materialized state store) 
and then passed to the HeadersMerger. Some low-hanging optimizations can be 
considered though, e.g. if users do not have overridden this interface, then we 
can consider not reading the headers from the other side at all to save IO cost.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to